GNU Radio 3.3.0 C++ API
|
00001 /* -*- c++ -*- */ 00002 /* 00003 * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. 00004 * 00005 * This file is part of GNU Radio 00006 * 00007 * GNU Radio is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 3, or (at your option) 00010 * any later version. 00011 * 00012 * GNU Radio is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License along 00018 * with this program; if not, write to the Free Software Foundation, Inc., 00019 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 00020 */ 00021 00022 #ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H 00023 #define INCLUDED_GC_JOB_MANAGER_IMPL_H 00024 00025 #include <gcell/gc_job_manager.h> 00026 #include <gcell/gc_jd_stack.h> 00027 #include <gcell/gc_jd_queue.h> 00028 #include <gcell/gc_spu_args.h> 00029 #include "gc_client_thread_info.h" 00030 #include <libspe2.h> 00031 #include <vector> 00032 #include <boost/scoped_array.hpp> 00033 00034 typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr; 00035 typedef boost::shared_ptr<spe_program_handle_t> spe_program_handle_sptr; 00036 typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa; 00037 00038 00039 enum worker_state { 00040 WS_FREE, // not in use 00041 WS_INIT, // allocated and being initialized 00042 WS_RUNNING, // the thread is running 00043 WS_DEAD, // the thread is dead 00044 }; 00045 00046 struct worker_ctx { 00047 volatile worker_state state; 00048 unsigned int spe_idx; // [0, nspes-1] 00049 spe_context_ptr_t spe_ctx; 00050 spe_spu_control_area_t *spe_ctrl; 00051 pthread_t thread; 00052 gc_spu_args_t *spu_args; // pointer to 16-byte aligned struct 00053 00054 worker_ctx() 00055 : state(WS_FREE), spe_idx(0), spe_ctx(0), spe_ctrl(0), 00056 thread(0), spu_args(0) {} 00057 ~worker_ctx(); 00058 }; 00059 00060 enum evt_handler_state { 00061 EHS_INIT, // being initialized 00062 EHS_RUNNING, // thread is running 00063 EHS_SHUTTING_DOWN, // in process of shutting down everything 00064 EHS_WAITING_FOR_WORKERS_TO_DIE, 00065 EHS_DEAD, // thread is dead 00066 }; 00067 00068 enum job_completer_state { 00069 JCS_INIT, // being initialized 00070 JCS_RUNNING, // thread is running 00071 JCS_DEAD, // thread is dead 00072 }; 00073 00074 struct spe_event_handler { 00075 spe_event_handler_ptr_t ptr; 00076 00077 spe_event_handler() : ptr(0) {} 00078 ~spe_event_handler(){ 00079 if (ptr){ 00080 if (spe_event_handler_destroy(ptr) != 0){ 00081 perror("spe_event_handler_destroy"); 00082 } 00083 } 00084 } 00085 }; 00086 00087 00088 /*! 00089 * \brief Concrete class that manages SPE jobs. 00090 * 00091 * This class contains all the implementation details. 00092 */ 00093 class gc_job_manager_impl : public gc_job_manager 00094 { 00095 enum { MAX_SPES = 16 }; 00096 00097 int d_debug; 00098 gc_jm_options d_options; 00099 spe_program_handle_sptr d_spe_image; 00100 spe_gang_context_sptr d_gang; // boost::shared_ptr 00101 00102 worker_ctx d_worker[MAX_SPES]; // SPE ctx, thread, etc 00103 gc_spu_args_t *d_spu_args; // 16-byte aligned structs 00104 boost::shared_ptr<void> _d_spu_args_boost; // hack for automatic storage mgmt 00105 00106 gc_comp_info_t *d_comp_info; // 128-byte aligned structs 00107 boost::shared_ptr<void> _d_comp_info_boost; // hack for automatic storage mgmt 00108 00109 // used to coordinate communication w/ the event handling thread 00110 boost::mutex d_eh_mutex; 00111 boost::condition_variable d_eh_cond; 00112 pthread_t d_eh_thread; // the event handler thread 00113 volatile evt_handler_state d_eh_state; 00114 volatile bool d_shutdown_requested; 00115 spe_event_handler d_spe_event_handler; 00116 00117 // used to coordinate communication w/ the job completer thread 00118 boost::mutex d_jc_mutex; 00119 boost::condition_variable d_jc_cond; 00120 pthread_t d_jc_thread; // the job completion thread 00121 volatile job_completer_state d_jc_state; 00122 int d_jc_njobs_active; // # of jobs submitted but not yet reaped 00123 00124 // round robin notification of spes 00125 int d_ntell; // # of spes to tell 00126 unsigned int d_tell_start; // which one to start with 00127 00128 // All of the job descriptors are hung off of here. 00129 // We allocate them all in a single cache aligned chunk. 00130 gc_job_desc_t *d_jd; // [options.max_jobs] 00131 boost::shared_ptr<void> _d_jd_boost; // hack for automatic storage mgmt 00132 00133 gc_client_thread_info_sa d_client_thread; // [options.max_client_threads] 00134 00135 // We use bitvectors to represent the completing state of a job. Each 00136 // bitvector is d_bvlen longs in length. 00137 int d_bvlen; // bit vector length in longs 00138 00139 // This contains the storage for all the bitvectors used by the job 00140 // manager. There's 1 for each client thread, in the d_jobs_done 00141 // field. We allocate them all in a single cache aligned chunk. 00142 boost::shared_ptr<void> _d_all_bitvectors; // hack for automatic storage mgmt 00143 00144 // Lock free stack where we keep track of the free job descriptors. 00145 gc_jd_stack_t *d_free_list; // stack of free job descriptors 00146 boost::shared_ptr<void> _d_free_list_boost; // hack for automatic storage mgmt 00147 00148 // The PPE inserts jobs here; SPEs pull jobs from here. 00149 gc_jd_queue_t *d_queue; // job queue 00150 boost::shared_ptr<void> _d_queue_boost; // hack for automatic storage mgmt 00151 00152 int d_ea_args_maxsize; 00153 00154 struct gc_proc_def *d_proc_def; // the SPE procedure table 00155 uint32_t d_proc_def_ls_addr; // the LS address of the table 00156 int d_nproc_defs; // number of proc_defs in table 00157 00158 gc_client_thread_info *alloc_cti(); 00159 void free_cti(gc_client_thread_info *cti); 00160 00161 void create_event_handler(); 00162 void set_eh_state(evt_handler_state s); 00163 void set_ea_args_maxsize(int maxsize); 00164 00165 void notify_clients_jobs_are_done(unsigned int spe_num, 00166 unsigned int completion_info_idx); 00167 00168 public: 00169 void event_handler_loop(); // really private 00170 void job_completer_loop(); // really private 00171 00172 private: 00173 bool send_all_spes(uint32_t msg); 00174 bool send_spe(unsigned int spe, uint32_t msg); 00175 void print_event(spe_event_unit_t *evt); 00176 void handle_event(spe_event_unit_t *evt); 00177 bool incr_njobs_active(); 00178 void decr_njobs_active(int n); 00179 void tell_spes_to_check_queue(); 00180 void poll_for_job_completion(); 00181 00182 // bitvector ops 00183 void bv_zero(unsigned long *bv); 00184 void bv_clr(unsigned long *bv, unsigned int bitno); 00185 void bv_set(unsigned long *bv, unsigned int bitno); 00186 bool bv_isset(unsigned long *bv, unsigned int bitno); 00187 bool bv_isclr(unsigned long *bv, unsigned int bitno); 00188 00189 void setup_logfiles(); 00190 void sync_logfiles(); 00191 void unmap_logfiles(); 00192 00193 friend gc_job_manager_sptr gc_make_job_manager(const gc_jm_options *options); 00194 00195 gc_job_manager_impl(const gc_jm_options *options = 0); 00196 00197 public: 00198 virtual ~gc_job_manager_impl(); 00199 00200 /*! 00201 * Stop accepting new jobs. Wait for existing jobs to complete. 00202 * Return all managed SPE's to the system. 00203 */ 00204 virtual bool shutdown(); 00205 00206 /*! 00207 * \brief Return number of SPE's currently allocated to job manager. 00208 */ 00209 virtual int nspes() const; 00210 00211 /*! 00212 * \brief Return a pointer to a properly aligned job descriptor, 00213 * or zero if none are available. 00214 */ 00215 virtual gc_job_desc *alloc_job_desc(); 00216 00217 /* 00218 *! Return a job descriptor previously allocated with alloc_job_desc() 00219 * 00220 * \param[in] jd pointer to job descriptor to free. 00221 */ 00222 virtual void free_job_desc(gc_job_desc *jd); 00223 00224 /*! 00225 * \brief Submit a job for asynchronous processing on an SPE. 00226 * 00227 * \param[in] jd pointer to job description 00228 * 00229 * The caller must not read or write the job description 00230 * or any of the memory associated with any indirect arguments 00231 * until after calling wait_job. 00232 * 00233 * \returns true iff the job was successfully enqueued. 00234 * If submit_job returns false, check jd->status for additional info. 00235 */ 00236 virtual bool submit_job(gc_job_desc *jd); 00237 00238 /*! 00239 * \brief Wait for job to complete. 00240 * 00241 * A thread may only wait for jobs which it submitted. 00242 * 00243 * \returns true if sucessful, else false. 00244 */ 00245 virtual bool 00246 wait_job(gc_job_desc *jd); 00247 00248 /*! 00249 * \brief wait for 1 or more jobs to complete. 00250 * 00251 * \param[in] njobs is the length of arrays \p jd and \p done. 00252 * \param[in] jd are the jobs that are to be waited for. 00253 * \param[out] done indicates whether the corresponding job is complete. 00254 * \param[in] mode indicates whether to wait for ALL or ANY of the jobs 00255 * in \p jd to complete. 00256 * 00257 * A thread may only wait for jobs which it submitted. 00258 * 00259 * \returns number of jobs completed, or -1 if error. 00260 */ 00261 virtual int 00262 wait_jobs(unsigned int njobs, 00263 gc_job_desc *jd[], bool done[], gc_wait_mode mode); 00264 00265 virtual int ea_args_maxsize(); 00266 00267 virtual gc_proc_id_t lookup_proc(const std::string &name); 00268 virtual std::vector<std::string> proc_names(); 00269 00270 virtual void set_debug(int debug); 00271 virtual int debug(); 00272 }; 00273 00274 #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */