GNU Radio 3.6.5 C++ API

gr_block.h

Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2004,2007,2009,2010 Free Software Foundation, Inc.
00004  *
00005  * This file is part of GNU Radio
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  *
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef INCLUDED_GR_BLOCK_H
00024 #define INCLUDED_GR_BLOCK_H
00025 
00026 #include <gr_core_api.h>
00027 #include <gr_basic_block.h>
00028 #include <gr_tags.h>
00029 #include <gr_logger.h>
00030 
00031 /*!
00032  * \brief The abstract base class for all 'terminal' processing blocks.
00033  * \ingroup base_blk
00034  *
00035  * A signal processing flow is constructed by creating a tree of
00036  * hierarchical blocks, which at any level may also contain terminal nodes
00037  * that actually implement signal processing functions. This is the base
00038  * class for all such leaf nodes.
00039 
00040  * Blocks have a set of input streams and output streams.  The
00041  * input_signature and output_signature define the number of input
00042  * streams and output streams respectively, and the type of the data
00043  * items in each stream.
00044  *
00045  * Although blocks may consume data on each input stream at a
00046  * different rate, all outputs streams must produce data at the same
00047  * rate.  That rate may be different from any of the input rates.
00048  *
00049  * User derived blocks override two methods, forecast and general_work,
00050  * to implement their signal processing behavior. forecast is called
00051  * by the system scheduler to determine how many items are required on
00052  * each input stream in order to produce a given number of output
00053  * items.
00054  *
00055  * general_work is called to perform the signal processing in the block.
00056  * It reads the input items and writes the output items.
00057  */
00058 
00059 class GR_CORE_API gr_block : public gr_basic_block {
00060 
00061  public:
00062 
00063   //! Magic return values from general_work
00064   enum {
00065     WORK_CALLED_PRODUCE = -2,
00066     WORK_DONE = -1
00067   };
00068 
00069   enum tag_propagation_policy_t {
00070     TPP_DONT = 0,
00071     TPP_ALL_TO_ALL = 1,
00072     TPP_ONE_TO_ONE = 2
00073   };
00074 
00075   virtual ~gr_block ();
00076 
00077   /*!
00078    * Assume block computes y_i = f(x_i, x_i-1, x_i-2, x_i-3...)
00079    * History is the number of x_i's that are examined to produce one y_i.
00080    * This comes in handy for FIR filters, where we use history to
00081    * ensure that our input contains the appropriate "history" for the
00082    * filter.   History should be equal to the number of filter taps.
00083    */
00084   unsigned history () const { return d_history; }
00085   void  set_history (unsigned history) { d_history = history; }
00086 
00087   /*!
00088    * \brief Return true if this block has a fixed input to output rate.
00089    *
00090    * If true, then fixed_rate_in_to_out and fixed_rate_out_to_in may be called.
00091    */
00092   bool fixed_rate() const { return d_fixed_rate; }
00093 
00094   // ----------------------------------------------------------------
00095   //            override these to define your behavior
00096   // ----------------------------------------------------------------
00097 
00098   /*!
00099    * \brief  Estimate input requirements given output request
00100    *
00101    * \param noutput_items           number of output items to produce
00102    * \param ninput_items_required   number of input items required on each input stream
00103    *
00104    * Given a request to product \p noutput_items, estimate the number of
00105    * data items required on each input stream.  The estimate doesn't have
00106    * to be exact, but should be close.
00107    */
00108   virtual void forecast (int noutput_items,
00109                          gr_vector_int &ninput_items_required);
00110 
00111   /*!
00112    * \brief compute output items from input items
00113    *
00114    * \param noutput_items       number of output items to write on each output stream
00115    * \param ninput_items        number of input items available on each input stream
00116    * \param input_items         vector of pointers to the input items, one entry per input stream
00117    * \param output_items        vector of pointers to the output items, one entry per output stream
00118    *
00119    * \returns number of items actually written to each output stream, or -1 on EOF.
00120    * It is OK to return a value less than noutput_items.  -1 <= return value <= noutput_items
00121    *
00122    * general_work must call consume or consume_each to indicate how many items
00123    * were consumed on each input stream.
00124    */
00125   virtual int general_work (int noutput_items,
00126                             gr_vector_int &ninput_items,
00127                             gr_vector_const_void_star &input_items,
00128                             gr_vector_void_star &output_items);
00129 
00130   /*!
00131    * \brief Called to enable drivers, etc for i/o devices.
00132    *
00133    * This allows a block to enable an associated driver to begin
00134    * transfering data just before we start to execute the scheduler.
00135    * The end result is that this reduces latency in the pipeline when
00136    * dealing with audio devices, usrps, etc.
00137    */
00138   virtual bool start();
00139 
00140   /*!
00141    * \brief Called to disable drivers, etc for i/o devices.
00142    */
00143   virtual bool stop();
00144 
00145   // ----------------------------------------------------------------
00146 
00147   /*!
00148    * \brief Constrain the noutput_items argument passed to forecast and general_work
00149    *
00150    * set_output_multiple causes the scheduler to ensure that the noutput_items
00151    * argument passed to forecast and general_work will be an integer multiple
00152    * of \param multiple  The default value of output multiple is 1.
00153    */
00154   void set_output_multiple (int multiple);
00155   int  output_multiple () const { return d_output_multiple; }
00156   bool  output_multiple_set () const { return d_output_multiple_set; }
00157 
00158   /*!
00159    * \brief Constrains buffers to work on a set item alignment (for SIMD)
00160    *
00161    * set_alignment_multiple causes the scheduler to ensure that the noutput_items
00162    * argument passed to forecast and general_work will be an integer multiple
00163    * of \param multiple  The default value is 1.
00164    *
00165    * This control is similar to the output_multiple setting, except
00166    * that if the number of items passed to the block is less than the
00167    * output_multiple, this value is ignored and the block can produce
00168    * like normal. The d_unaligned value is set to the number of items
00169    * the block is off by. In the next call to general_work, the
00170    * noutput_items is set to d_unaligned or less until
00171    * d_unaligned==0. The buffers are now aligned again and the aligned
00172    * calls can be performed again.
00173    */
00174   void set_alignment (int multiple);
00175   int  alignment () const { return d_output_multiple; }
00176 
00177   void set_unaligned (int na);
00178   int unaligned () const { return d_unaligned; }
00179   void set_is_unaligned (bool u);
00180   bool is_unaligned () const { return d_is_unaligned; }
00181 
00182   /*!
00183    * \brief Tell the scheduler \p how_many_items of input stream \p which_input were consumed.
00184    */
00185   void consume (int which_input, int how_many_items);
00186 
00187   /*!
00188    * \brief Tell the scheduler \p how_many_items were consumed on each input stream.
00189    */
00190   void consume_each (int how_many_items);
00191 
00192   /*!
00193    * \brief Tell the scheduler \p how_many_items were produced on output stream \p which_output.
00194    *
00195    * If the block's general_work method calls produce, \p general_work must return WORK_CALLED_PRODUCE.
00196    */
00197   void produce (int which_output, int how_many_items);
00198 
00199   /*!
00200    * \brief Set the approximate output rate / input rate
00201    *
00202    * Provide a hint to the buffer allocator and scheduler.
00203    * The default relative_rate is 1.0
00204    *
00205    * decimators have relative_rates < 1.0
00206    * interpolators have relative_rates > 1.0
00207    */
00208   void  set_relative_rate (double relative_rate);
00209 
00210   /*!
00211    * \brief return the approximate output rate / input rate
00212    */
00213   double relative_rate () const { return d_relative_rate; }
00214 
00215   /*
00216    * The following two methods provide special case info to the
00217    * scheduler in the event that a block has a fixed input to output
00218    * ratio.  gr_sync_block, gr_sync_decimator and gr_sync_interpolator
00219    * override these.  If you're fixed rate, subclass one of those.
00220    */
00221   /*!
00222    * \brief Given ninput samples, return number of output samples that will be produced.
00223    * N.B. this is only defined if fixed_rate returns true.
00224    * Generally speaking, you don't need to override this.
00225    */
00226   virtual int fixed_rate_ninput_to_noutput(int ninput);
00227 
00228   /*!
00229    * \brief Given noutput samples, return number of input samples required to produce noutput.
00230    * N.B. this is only defined if fixed_rate returns true.
00231    * Generally speaking, you don't need to override this.
00232    */
00233   virtual int fixed_rate_noutput_to_ninput(int noutput);
00234 
00235   /*!
00236    * \brief Return the number of items read on input stream which_input
00237    */
00238   uint64_t nitems_read(unsigned int which_input);
00239 
00240   /*!
00241    * \brief  Return the number of items written on output stream which_output
00242    */
00243   uint64_t nitems_written(unsigned int which_output);
00244 
00245   /*!
00246    * \brief Asks for the policy used by the scheduler to moved tags downstream.
00247    */
00248   tag_propagation_policy_t tag_propagation_policy();
00249 
00250   /*!
00251    * \brief Set the policy by the scheduler to determine how tags are moved downstream.
00252    */
00253   void set_tag_propagation_policy(tag_propagation_policy_t p);
00254 
00255   /*!
00256    * \brief Return the minimum number of output items this block can
00257    * produce during a call to work.
00258    *
00259    * Should be 0 for most blocks.  Useful if we're dealing with packets and
00260    * the block produces one packet per call to work.
00261   */
00262   int min_noutput_items() const { return d_min_noutput_items; }
00263 
00264   /*!
00265    * \brief Set the minimum number of output items this block can
00266    * produce during a call to work.
00267    *
00268    * \param m the minimum noutput_items this block can produce.
00269    */
00270   void set_min_noutput_items(int m) { d_min_noutput_items = m; }
00271 
00272   /*!
00273    * \brief Return the maximum number of output items this block will
00274    * handle during a call to work.
00275    */
00276   int max_noutput_items();
00277 
00278   /*!
00279    * \brief Set the maximum number of output items this block will
00280    * handle during a call to work.
00281    *
00282    * \param m the maximum noutput_items this block will handle.
00283    */
00284   void set_max_noutput_items(int m);
00285 
00286   /*!
00287    * \brief Clear the switch for using the max_noutput_items value of this block.
00288    *
00289    * When is_set_max_noutput_items() returns 'true', the scheduler
00290    * will use the value returned by max_noutput_items() to limit the
00291    * size of the number of items possible for this block's work
00292    * function. If is_set_max_notput_items() returns 'false', then the
00293    * scheduler ignores the internal value and uses the value set
00294    * globally in the top_block.
00295    *
00296    * Use this value to clear the 'is_set' flag so the scheduler will
00297    * ignore this. Use the set_max_noutput_items(m) call to both set a
00298    * new value for max_noutput_items and to reenable its use in the
00299    * scheduler.
00300    */
00301   void unset_max_noutput_items();
00302 
00303   /*!
00304    * \brief Ask the block if the flag is or is not set to use the
00305    * internal value of max_noutput_items during a call to work.
00306    */
00307   bool is_set_max_noutput_items();
00308 
00309   /*
00310    * Used to expand the vectors that hold the min/max buffer sizes.
00311    *
00312    * Specifically, when -1 is used, the vectors are just initialized
00313    * with 1 value; this is used by the flat_flowgraph to expand when
00314    * required to add a new value for new ports on these blocks.
00315    */
00316   void expand_minmax_buffer(int port) {
00317     if((size_t)port >= d_max_output_buffer.size())
00318       set_max_output_buffer(port, -1);
00319     if((size_t)port >= d_min_output_buffer.size())
00320       set_min_output_buffer(port, -1);
00321   }
00322 
00323   /*!
00324    * \brief Returns max buffer size on output port \p i.
00325    */
00326   long max_output_buffer(size_t i) {
00327     if(i >= d_max_output_buffer.size())
00328       throw std::invalid_argument("gr_basic_block::max_output_buffer: port out of range.");
00329     return d_max_output_buffer[i];
00330   }
00331 
00332   /*!
00333    * \brief Sets max buffer size on all output ports.
00334    */
00335   void set_max_output_buffer(long max_output_buffer) { 
00336     for(int i = 0; i < output_signature()->max_streams(); i++) {
00337       set_max_output_buffer(i, max_output_buffer);
00338     }
00339   }
00340 
00341   /*!
00342    * \brief Sets max buffer size on output port \p port.
00343    */
00344   void set_max_output_buffer(int port, long max_output_buffer) {
00345     if((size_t)port >= d_max_output_buffer.size())
00346       d_max_output_buffer.push_back(max_output_buffer);
00347     else
00348       d_max_output_buffer[port] = max_output_buffer; 
00349   }
00350 
00351   /*!
00352    * \brief Returns min buffer size on output port \p i.
00353    */
00354   long min_output_buffer(size_t i) {
00355     if(i >= d_min_output_buffer.size())
00356       throw std::invalid_argument("gr_basic_block::min_output_buffer: port out of range.");
00357     return d_min_output_buffer[i];
00358   }
00359 
00360   /*!
00361    * \brief Sets min buffer size on all output ports.
00362    */
00363   void set_min_output_buffer(long min_output_buffer) {
00364     for(int i=0; i<output_signature()->max_streams(); i++) {
00365       set_min_output_buffer(i, min_output_buffer);
00366     }
00367   }
00368 
00369   /*!
00370    * \brief Sets min buffer size on output port \p port.
00371    */
00372   void set_min_output_buffer(int port, long min_output_buffer) {
00373     if((size_t)port >= d_min_output_buffer.size())
00374       d_min_output_buffer.push_back(min_output_buffer);
00375     else
00376       d_min_output_buffer[port] = min_output_buffer; 
00377   }
00378 
00379   // --------------- Performance counter functions -------------
00380 
00381   /*!
00382    * \brief Gets average noutput_items performance counter.
00383    */
00384   float pc_noutput_items();
00385 
00386   /*!
00387    * \brief Gets variance of noutput_items performance counter.
00388    */
00389   float pc_noutput_items_var();
00390 
00391   /*!
00392    * \brief Gets average num items produced performance counter.
00393    */
00394   float pc_nproduced();
00395 
00396   /*!
00397    * \brief Gets variance of  num items produced performance counter.
00398    */
00399   float pc_nproduced_var();
00400 
00401   /*!
00402    * \brief Gets average fullness of \p which input buffer.
00403    */
00404   float pc_input_buffers_full(int which);
00405 
00406   /*!
00407    * \brief Gets variance of fullness of \p which input buffer.
00408    */
00409   float pc_input_buffers_full_var(int which);
00410 
00411   /*!
00412    * \brief Gets average fullness of all input buffers.
00413    */
00414   std::vector<float> pc_input_buffers_full();
00415 
00416   /*!
00417    * \brief Gets variance of fullness of all input buffers.
00418    */
00419   std::vector<float> pc_input_buffers_full_var();
00420 
00421   /*!
00422    * \brief Gets average fullness of \p which input buffer.
00423    */
00424   float pc_output_buffers_full(int which);
00425 
00426   /*!
00427    * \brief Gets variance of fullness of \p which input buffer.
00428    */
00429   float pc_output_buffers_full_var(int which);
00430 
00431   /*!
00432    * \brief Gets average fullness of all output buffers.
00433    */
00434   std::vector<float> pc_output_buffers_full();
00435   /*!
00436    * \brief Gets variance of fullness of all output buffers.
00437    */
00438   std::vector<float> pc_output_buffers_full_var();
00439 
00440   /*!
00441    * \brief Gets average clock cycles spent in work.
00442    */
00443   float pc_work_time();
00444 
00445   /*!
00446    * \brief Gets average clock cycles spent in work.
00447    */
00448   float pc_work_time_var();
00449 
00450   /*!
00451    * \brief Resets the performance counters
00452    */
00453   void reset_perf_counters();
00454 
00455 
00456   // ----------------------------------------------------------------------------
00457   // Functions to handle thread affinity
00458 
00459   /*!
00460    * \brief Set the thread's affinity to processor core \p n.
00461    *
00462    * \param mask a vector of ints of the core numbers available to this block.
00463    */
00464   void set_processor_affinity(const std::vector<int> &mask);
00465 
00466   /*!
00467    * \brief Remove processor affinity to a specific core.
00468    */
00469   void unset_processor_affinity();
00470 
00471   /*!
00472    * \brief Get the current processor affinity.
00473    */
00474   std::vector<int> processor_affinity() { return d_affinity; }
00475 
00476   // ----------------------------------------------------------------------------
00477 
00478  private:
00479 
00480   int                   d_output_multiple;
00481   bool                  d_output_multiple_set;
00482   int                   d_unaligned;
00483   bool                  d_is_unaligned;
00484   double                d_relative_rate;        // approx output_rate / input_rate
00485   gr_block_detail_sptr  d_detail;               // implementation details
00486   unsigned              d_history;
00487   bool                  d_fixed_rate;
00488   bool                  d_max_noutput_items_set;     // if d_max_noutput_items is valid
00489   int                   d_max_noutput_items;         // value of max_noutput_items for this block
00490   int                   d_min_noutput_items;
00491   tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
00492   std::vector<int>      d_affinity;              // thread affinity proc. mask
00493 
00494  protected:
00495   gr_block (void){} //allows pure virtual interface sub-classes
00496   gr_block (const std::string &name,
00497             gr_io_signature_sptr input_signature,
00498             gr_io_signature_sptr output_signature);
00499 
00500   void set_fixed_rate(bool fixed_rate){ d_fixed_rate = fixed_rate; }
00501 
00502 
00503   /*!
00504    * \brief  Adds a new tag onto the given output buffer.
00505    *
00506    * \param which_output an integer of which output stream to attach the tag
00507    * \param abs_offset   a uint64 number of the absolute item number
00508    *                     assicated with the tag. Can get from nitems_written.
00509    * \param key          the tag key as a PMT symbol
00510    * \param value        any PMT holding any value for the given key
00511    * \param srcid        optional source ID specifier; defaults to PMT_F
00512    */
00513   inline void add_item_tag(unsigned int which_output,
00514                     uint64_t abs_offset,
00515                     const pmt::pmt_t &key,
00516                     const pmt::pmt_t &value,
00517                     const pmt::pmt_t &srcid=pmt::PMT_F)
00518     {
00519         gr_tag_t tag;
00520         tag.offset = abs_offset;
00521         tag.key = key;
00522         tag.value = value;
00523         tag.srcid = srcid;
00524         this->add_item_tag(which_output, tag);
00525     }
00526 
00527  /*!
00528    * \brief  Adds a new tag onto the given output buffer.
00529    *
00530    * \param which_output an integer of which output stream to attach the tag
00531    * \param tag the tag object to add
00532    */
00533   void add_item_tag(unsigned int which_output, const gr_tag_t &tag);
00534 
00535   /*!
00536    * \brief  Removes a tag from the given input buffer.
00537    *
00538    * \param which_input an integer of which input stream to remove the tag from
00539    * \param abs_offset   a uint64 number of the absolute item number
00540    *                     assicated with the tag. Can get from nitems_written.
00541    * \param key          the tag key as a PMT symbol
00542    * \param value        any PMT holding any value for the given key
00543    * \param srcid        optional source ID specifier; defaults to PMT_F
00544    *
00545    * If no such tag is found, does nothing.
00546    */
00547   inline void remove_item_tag(unsigned int which_input,
00548                     uint64_t abs_offset,
00549                     const pmt::pmt_t &key,
00550                     const pmt::pmt_t &value,
00551                     const pmt::pmt_t &srcid=pmt::PMT_F)
00552   {
00553       gr_tag_t tag;
00554       tag.offset = abs_offset;
00555       tag.key = key;
00556       tag.value = value;
00557       tag.srcid = srcid;
00558       this->remove_item_tag(which_input, tag);
00559   }
00560 
00561  /*!
00562    * \brief  Removes a tag from the given input buffer.
00563    *
00564    * If no such tag is found, does nothing.
00565    *
00566    * \param which_input an integer of which input stream to remove the tag from
00567    * \param tag the tag object to remove
00568    */
00569   void remove_item_tag(unsigned int which_input, const gr_tag_t &tag);
00570 
00571   /*!
00572    * \brief Given a [start,end), returns a vector of all tags in the range.
00573    *
00574    * Range of counts is from start to end-1.
00575    *
00576    * Tags are tuples of:
00577    *      (item count, source id, key, value)
00578    *
00579    * \param v            a vector reference to return tags into
00580    * \param which_input  an integer of which input stream to pull from
00581    * \param abs_start    a uint64 count of the start of the range of interest
00582    * \param abs_end      a uint64 count of the end of the range of interest
00583    */
00584   void get_tags_in_range(std::vector<gr_tag_t> &v,
00585                          unsigned int which_input,
00586                          uint64_t abs_start,
00587                          uint64_t abs_end);
00588 
00589   /*!
00590    * \brief Given a [start,end), returns a vector of all tags in the range
00591    * with a given key.
00592    *
00593    * Range of counts is from start to end-1.
00594    *
00595    * Tags are tuples of:
00596    *      (item count, source id, key, value)
00597    *
00598    * \param v            a vector reference to return tags into
00599    * \param which_input  an integer of which input stream to pull from
00600    * \param abs_start    a uint64 count of the start of the range of interest
00601    * \param abs_end      a uint64 count of the end of the range of interest
00602    * \param key          a PMT symbol key to filter only tags of this key
00603    */
00604   void get_tags_in_range(std::vector<gr_tag_t> &v,
00605                          unsigned int which_input,
00606                          uint64_t abs_start,
00607                          uint64_t abs_end,
00608                          const pmt::pmt_t &key);
00609 
00610   std::vector<long>    d_max_output_buffer;
00611   std::vector<long>    d_min_output_buffer;
00612 
00613   /*! Used by block's setters and work functions to make
00614    * setting/resetting of parameters thread-safe.
00615    *
00616    * Used by calling gruel::scoped_lock l(d_setlock);
00617    */ 
00618   gruel::mutex d_setlock;
00619 
00620   /*! Used by blocks to access the logger system.
00621    */ 
00622   gr_logger_ptr d_logger;
00623   gr_logger_ptr d_debug_logger;
00624 
00625   // These are really only for internal use, but leaving them public avoids
00626   // having to work up an ever-varying list of friend GR_CORE_APIs
00627 
00628  public:
00629   gr_block_detail_sptr detail () const { return d_detail; }
00630   void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
00631 };
00632 
00633 typedef std::vector<gr_block_sptr> gr_block_vector_t;
00634 typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t;
00635 
00636 inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p)
00637 {
00638   return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p);
00639 }
00640 
00641 
00642 std::ostream&
00643 operator << (std::ostream& os, const gr_block *m);
00644 
00645 #endif /* INCLUDED_GR_BLOCK_H */