GNU Radio 3.3.0 C++ API
gc_job_manager_impl.h
Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
00004  * 
00005  * This file is part of GNU Radio
00006  * 
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  * 
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License along
00018  * with this program; if not, write to the Free Software Foundation, Inc.,
00019  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00020  */
00021 
00022 #ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H
00023 #define INCLUDED_GC_JOB_MANAGER_IMPL_H
00024 
00025 #include <gcell/gc_job_manager.h>
00026 #include <gcell/gc_jd_stack.h>
00027 #include <gcell/gc_jd_queue.h>
00028 #include <gcell/gc_spu_args.h>
00029 #include "gc_client_thread_info.h"
00030 #include <libspe2.h>
00031 #include <vector>
00032 #include <boost/scoped_array.hpp>
00033 
00034 typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr;
00035 typedef boost::shared_ptr<spe_program_handle_t> spe_program_handle_sptr;
00036 typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa;
00037 
00038 
00039 enum worker_state {
00040   WS_FREE,      // not in use
00041   WS_INIT,      // allocated and being initialized
00042   WS_RUNNING,   // the thread is running
00043   WS_DEAD,      // the thread is dead
00044 };
00045 
00046 struct worker_ctx {
00047   volatile worker_state   state;
00048   unsigned int            spe_idx;      // [0, nspes-1]
00049   spe_context_ptr_t       spe_ctx;
00050   spe_spu_control_area_t *spe_ctrl;
00051   pthread_t               thread;
00052   gc_spu_args_t          *spu_args;     // pointer to 16-byte aligned struct
00053 
00054   worker_ctx()
00055     : state(WS_FREE), spe_idx(0), spe_ctx(0), spe_ctrl(0),
00056       thread(0), spu_args(0) {}
00057   ~worker_ctx();
00058 };
00059 
00060 enum evt_handler_state {
00061   EHS_INIT,             // being initialized
00062   EHS_RUNNING,          // thread is running
00063   EHS_SHUTTING_DOWN,    // in process of shutting down everything
00064   EHS_WAITING_FOR_WORKERS_TO_DIE,
00065   EHS_DEAD,             // thread is dead
00066 };
00067 
00068 enum job_completer_state {
00069   JCS_INIT,             // being initialized
00070   JCS_RUNNING,          // thread is running
00071   JCS_DEAD,             // thread is dead
00072 };
00073 
00074 struct spe_event_handler {
00075   spe_event_handler_ptr_t       ptr;
00076 
00077   spe_event_handler() : ptr(0) {}
00078   ~spe_event_handler(){
00079     if (ptr){
00080       if (spe_event_handler_destroy(ptr) != 0){
00081         perror("spe_event_handler_destroy");
00082       }
00083     }
00084   }
00085 };
00086 
00087 
00088 /*!
00089  * \brief Concrete class that manages SPE jobs.
00090  *
00091  * This class contains all the implementation details.
00092  */
00093 class gc_job_manager_impl : public gc_job_manager
00094 {
00095   enum { MAX_SPES =  16 };
00096 
00097   int                     d_debug;
00098   gc_jm_options           d_options;
00099   spe_program_handle_sptr d_spe_image;
00100   spe_gang_context_sptr   d_gang;               // boost::shared_ptr
00101 
00102   worker_ctx             d_worker[MAX_SPES];    // SPE ctx, thread, etc
00103   gc_spu_args_t         *d_spu_args;            // 16-byte aligned structs
00104   boost::shared_ptr<void> _d_spu_args_boost;    // hack for automatic storage mgmt
00105 
00106   gc_comp_info_t        *d_comp_info;           // 128-byte aligned structs
00107   boost::shared_ptr<void> _d_comp_info_boost;   // hack for automatic storage mgmt
00108 
00109   // used to coordinate communication w/ the event handling thread
00110   boost::mutex           d_eh_mutex;
00111   boost::condition_variable d_eh_cond;
00112   pthread_t              d_eh_thread;           // the event handler thread
00113   volatile evt_handler_state    d_eh_state;
00114   volatile bool                 d_shutdown_requested;
00115   spe_event_handler      d_spe_event_handler;
00116   
00117   // used to coordinate communication w/ the job completer thread
00118   boost::mutex           d_jc_mutex;
00119   boost::condition_variable d_jc_cond;
00120   pthread_t              d_jc_thread;           // the job completion thread
00121   volatile job_completer_state  d_jc_state;
00122   int                    d_jc_njobs_active;     // # of jobs submitted but not yet reaped
00123 
00124   // round robin notification of spes
00125   int                    d_ntell;               // # of spes to tell
00126   unsigned int           d_tell_start;          // which one to start with
00127 
00128   // All of the job descriptors are hung off of here.
00129   // We allocate them all in a single cache aligned chunk.
00130   gc_job_desc_t         *d_jd;                  // [options.max_jobs]
00131   boost::shared_ptr<void> _d_jd_boost;          // hack for automatic storage mgmt
00132 
00133   gc_client_thread_info_sa d_client_thread;     // [options.max_client_threads]
00134 
00135   // We use bitvectors to represent the completing state of a job.  Each
00136   // bitvector is d_bvlen longs in length.
00137   int                    d_bvlen;               // bit vector length in longs
00138 
00139   // This contains the storage for all the bitvectors used by the job
00140   // manager.  There's 1 for each client thread, in the d_jobs_done
00141   // field.  We allocate them all in a single cache aligned chunk.
00142   boost::shared_ptr<void> _d_all_bitvectors;    // hack for automatic storage mgmt
00143 
00144   // Lock free stack where we keep track of the free job descriptors.
00145   gc_jd_stack_t         *d_free_list;           // stack of free job descriptors
00146   boost::shared_ptr<void> _d_free_list_boost;   // hack for automatic storage mgmt
00147 
00148   // The PPE inserts jobs here; SPEs pull jobs from here.
00149   gc_jd_queue_t         *d_queue;               // job queue
00150   boost::shared_ptr<void> _d_queue_boost;       // hack for automatic storage mgmt
00151 
00152   int                    d_ea_args_maxsize;
00153 
00154   struct gc_proc_def    *d_proc_def;            // the SPE procedure table
00155   uint32_t               d_proc_def_ls_addr;    // the LS address of the table
00156   int                    d_nproc_defs;          // number of proc_defs in table
00157 
00158   gc_client_thread_info *alloc_cti();
00159   void free_cti(gc_client_thread_info *cti);
00160 
00161   void create_event_handler();
00162   void set_eh_state(evt_handler_state s);
00163   void set_ea_args_maxsize(int maxsize);
00164 
00165   void notify_clients_jobs_are_done(unsigned int spe_num,
00166                                     unsigned int completion_info_idx);
00167 
00168 public:
00169   void event_handler_loop();    // really private
00170   void job_completer_loop();    // really private
00171 
00172 private:
00173   bool send_all_spes(uint32_t msg);
00174   bool send_spe(unsigned int spe, uint32_t msg);
00175   void print_event(spe_event_unit_t *evt);
00176   void handle_event(spe_event_unit_t *evt);
00177   bool incr_njobs_active();
00178   void decr_njobs_active(int n);
00179   void tell_spes_to_check_queue();
00180   void poll_for_job_completion();
00181 
00182   // bitvector ops
00183   void bv_zero(unsigned long *bv);
00184   void bv_clr(unsigned long *bv, unsigned int bitno);
00185   void bv_set(unsigned long *bv, unsigned int bitno);
00186   bool bv_isset(unsigned long *bv, unsigned int bitno);
00187   bool bv_isclr(unsigned long *bv, unsigned int bitno);
00188 
00189   void setup_logfiles();
00190   void sync_logfiles();
00191   void unmap_logfiles();
00192 
00193   friend gc_job_manager_sptr gc_make_job_manager(const gc_jm_options *options);
00194   
00195   gc_job_manager_impl(const gc_jm_options *options = 0);
00196 
00197 public:
00198   virtual ~gc_job_manager_impl();
00199 
00200   /*!
00201    * Stop accepting new jobs.  Wait for existing jobs to complete.
00202    * Return all managed SPE's to the system.
00203    */
00204   virtual bool shutdown();
00205 
00206   /*!
00207    * \brief Return number of SPE's currently allocated to job manager.
00208    */
00209   virtual int nspes() const;
00210 
00211   /*!
00212    * \brief Return a pointer to a properly aligned job descriptor,
00213    * or zero if none are available.
00214    */
00215   virtual gc_job_desc *alloc_job_desc();
00216 
00217   /*
00218    *! Return a job descriptor previously allocated with alloc_job_desc()
00219    *
00220    * \param[in] jd pointer to job descriptor to free.
00221    */
00222   virtual void free_job_desc(gc_job_desc *jd);
00223 
00224   /*!
00225    * \brief Submit a job for asynchronous processing on an SPE.
00226    *
00227    * \param[in] jd pointer to job description
00228    *
00229    * The caller must not read or write the job description
00230    * or any of the memory associated with any indirect arguments
00231    * until after calling wait_job.
00232    *
00233    * \returns true iff the job was successfully enqueued.
00234    * If submit_job returns false, check jd->status for additional info.
00235    */
00236   virtual bool submit_job(gc_job_desc *jd);
00237 
00238   /*!
00239    * \brief Wait for job to complete.
00240    *
00241    * A thread may only wait for jobs which it submitted.
00242    *
00243    * \returns true if sucessful, else false.
00244    */
00245   virtual bool 
00246   wait_job(gc_job_desc *jd);
00247 
00248   /*!
00249    * \brief wait for 1 or more jobs to complete.
00250    *
00251    * \param[in] njobs is the length of arrays \p jd and \p done.
00252    * \param[in] jd are the jobs that are to be waited for.
00253    * \param[out] done indicates whether the corresponding job is complete.
00254    * \param[in] mode indicates whether to wait for ALL or ANY of the jobs
00255    *   in \p jd to complete.
00256    *
00257    * A thread may only wait for jobs which it submitted.
00258    *
00259    * \returns number of jobs completed, or -1 if error.
00260    */
00261   virtual int
00262   wait_jobs(unsigned int njobs,
00263             gc_job_desc *jd[], bool done[], gc_wait_mode mode);
00264 
00265   virtual int ea_args_maxsize();
00266 
00267   virtual gc_proc_id_t lookup_proc(const std::string &name);
00268   virtual std::vector<std::string> proc_names();
00269 
00270   virtual void set_debug(int debug);
00271   virtual int debug();
00272 };
00273 
00274 #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */