diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2014-05-07 11:36:10 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2014-05-07 11:36:10 -0700 |
commit | a3127199b8a380ff8abeaa572da1e191a9b396ea (patch) | |
tree | 6397f72ad87513b5a94e0fccf0749b2e6f4a2320 | |
parent | ddb4c1d744b417e79cf2d263d6cd8db37c4dbcb2 (diff) | |
parent | a036dd2f3e74387316946fbac47282a9e7c1e87d (diff) |
Merge remote-tracking branch 'iohannez/vlen' into zeromq
-rw-r--r-- | gr-zeromq/examples/zeromq_pushpull.grc | 477 | ||||
-rw-r--r-- | gr-zeromq/examples/zeromq_reqrep.grc | 477 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/pub_sink.h | 3 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/pull_source.h | 1 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/push_sink.h | 1 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/rep_sink.h | 1 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/req_source.h | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 14 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.cc | 29 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 10 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 27 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.h | 1 | ||||
-rwxr-xr-x | gr-zeromq/python/zeromq/qa_zeromq_pushpull.py | 50 | ||||
-rwxr-xr-x | gr-zeromq/python/zeromq/qa_zeromq_reqrep.py | 50 |
19 files changed, 1111 insertions, 47 deletions
diff --git a/gr-zeromq/examples/zeromq_pushpull.grc b/gr-zeromq/examples/zeromq_pushpull.grc new file mode 100644 index 0000000000..0cff6738ed --- /dev/null +++ b/gr-zeromq/examples/zeromq_pushpull.grc @@ -0,0 +1,477 @@ +<?xml version='1.0' encoding='ASCII'?> +<flow_graph> + <timestamp>Wed May 7 12:06:31 2014</timestamp> + <block> + <key>options</key> + <param> + <key>id</key> + <value>top_block</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>title</key> + <value></value> + </param> + <param> + <key>author</key> + <value></value> + </param> + <param> + <key>description</key> + <value></value> + </param> + <param> + <key>window_size</key> + <value>1280, 1024</value> + </param> + <param> + <key>generate_options</key> + <value>wx_gui</value> + </param> + <param> + <key>category</key> + <value>Custom</value> + </param> + <param> + <key>run_options</key> + <value>prompt</value> + </param> + <param> + <key>run</key> + <value>True</value> + </param> + <param> + <key>max_nouts</key> + <value>0</value> + </param> + <param> + <key>realtime_scheduling</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(10, 10)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>variable</key> + <param> + <key>id</key> + <value>samp_rate</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>value</key> + <value>32000</value> + </param> + <param> + <key>_coordinate</key> + <value>(10, 170)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>wxgui_scopesink2</key> + <param> + <key>id</key> + <value>wxgui_scopesink2_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>title</key> + <value>Scope Plot</value> + </param> + <param> + <key>samp_rate</key> + <value>samp_rate</value> + </param> + <param> + <key>v_scale</key> + <value>0</value> + </param> + <param> + <key>v_offset</key> + <value>0</value> + </param> + <param> + <key>t_scale</key> + <value>0</value> + </param> + <param> + <key>ac_couple</key> + <value>False</value> + </param> + <param> + <key>xy_mode</key> + <value>False</value> + </param> + <param> + <key>num_inputs</key> + <value>1</value> + </param> + <param> + <key>win_size</key> + <value></value> + </param> + <param> + <key>grid_pos</key> + <value></value> + </param> + <param> + <key>notebook</key> + <value></value> + </param> + <param> + <key>trig_mode</key> + <value>wxgui.TRIG_MODE_AUTO</value> + </param> + <param> + <key>y_axis_label</key> + <value>Counts</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(567, 269)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>analog_sig_source_x</key> + <param> + <key>id</key> + <value>analog_sig_source_x_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>samp_rate</key> + <value>samp_rate</value> + </param> + <param> + <key>waveform</key> + <value>analog.GR_TRI_WAVE</value> + </param> + <param> + <key>freq</key> + <value>2000</value> + </param> + <param> + <key>amp</key> + <value>1</value> + </param> + <param> + <key>offset</key> + <value>0</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(170, 78)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>analog_fastnoise_source_x</key> + <param> + <key>id</key> + <value>analog_fastnoise_source_x_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>noise_type</key> + <value>analog.GR_GAUSSIAN</value> + </param> + <param> + <key>amp</key> + <value>1</value> + </param> + <param> + <key>seed</key> + <value>0</value> + </param> + <param> + <key>samples</key> + <value>8192</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(169, 196)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>blocks_add_xx</key> + <param> + <key>id</key> + <value>blocks_add_xx_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>num_inputs</key> + <value>2</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(403, 116)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>blocks_throttle</key> + <param> + <key>id</key> + <value>blocks_throttle_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>samples_per_second</key> + <value>samp_rate</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>ignoretag</key> + <value>True</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(537, 114)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>zeromq_pull_source</key> + <param> + <key>id</key> + <value>zeromq_pull_source_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>address</key> + <value>tcp://localhost:5555</value> + </param> + <param> + <key>timeout</key> + <value>0.1</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(137, 307)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>zeromq_push_sink</key> + <param> + <key>id</key> + <value>zeromq_push_sink_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>address</key> + <value>tcp://*:5555</value> + </param> + <param> + <key>blocking</key> + <value>False</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(752, 96)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <connection> + <source_block_id>blocks_throttle_0</source_block_id> + <sink_block_id>zeromq_push_sink_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>zeromq_pull_source_0</source_block_id> + <sink_block_id>wxgui_scopesink2_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>analog_sig_source_x_0</source_block_id> + <sink_block_id>blocks_add_xx_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>analog_fastnoise_source_x_0</source_block_id> + <sink_block_id>blocks_add_xx_0</sink_block_id> + <source_key>0</source_key> + <sink_key>1</sink_key> + </connection> + <connection> + <source_block_id>blocks_add_xx_0</source_block_id> + <sink_block_id>blocks_throttle_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> +</flow_graph> diff --git a/gr-zeromq/examples/zeromq_reqrep.grc b/gr-zeromq/examples/zeromq_reqrep.grc new file mode 100644 index 0000000000..2951acd120 --- /dev/null +++ b/gr-zeromq/examples/zeromq_reqrep.grc @@ -0,0 +1,477 @@ +<?xml version='1.0' encoding='ASCII'?> +<flow_graph> + <timestamp>Wed May 7 12:00:20 2014</timestamp> + <block> + <key>options</key> + <param> + <key>id</key> + <value>top_block</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>title</key> + <value></value> + </param> + <param> + <key>author</key> + <value></value> + </param> + <param> + <key>description</key> + <value></value> + </param> + <param> + <key>window_size</key> + <value>1280, 1024</value> + </param> + <param> + <key>generate_options</key> + <value>wx_gui</value> + </param> + <param> + <key>category</key> + <value>Custom</value> + </param> + <param> + <key>run_options</key> + <value>prompt</value> + </param> + <param> + <key>run</key> + <value>True</value> + </param> + <param> + <key>max_nouts</key> + <value>0</value> + </param> + <param> + <key>realtime_scheduling</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(10, 10)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>variable</key> + <param> + <key>id</key> + <value>samp_rate</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>value</key> + <value>32000</value> + </param> + <param> + <key>_coordinate</key> + <value>(10, 170)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>analog_sig_source_x</key> + <param> + <key>id</key> + <value>analog_sig_source_x_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>samp_rate</key> + <value>samp_rate</value> + </param> + <param> + <key>waveform</key> + <value>analog.GR_TRI_WAVE</value> + </param> + <param> + <key>freq</key> + <value>2000</value> + </param> + <param> + <key>amp</key> + <value>1</value> + </param> + <param> + <key>offset</key> + <value>0</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(170, 78)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>analog_fastnoise_source_x</key> + <param> + <key>id</key> + <value>analog_fastnoise_source_x_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>noise_type</key> + <value>analog.GR_GAUSSIAN</value> + </param> + <param> + <key>amp</key> + <value>1</value> + </param> + <param> + <key>seed</key> + <value>0</value> + </param> + <param> + <key>samples</key> + <value>8192</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(169, 196)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>blocks_add_xx</key> + <param> + <key>id</key> + <value>blocks_add_xx_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>num_inputs</key> + <value>2</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(403, 116)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>blocks_throttle</key> + <param> + <key>id</key> + <value>blocks_throttle_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>samples_per_second</key> + <value>samp_rate</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>ignoretag</key> + <value>True</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(537, 114)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>wxgui_scopesink2</key> + <param> + <key>id</key> + <value>wxgui_scopesink2_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>title</key> + <value>Scope Plot</value> + </param> + <param> + <key>samp_rate</key> + <value>samp_rate</value> + </param> + <param> + <key>v_scale</key> + <value>0</value> + </param> + <param> + <key>v_offset</key> + <value>0</value> + </param> + <param> + <key>t_scale</key> + <value>0</value> + </param> + <param> + <key>ac_couple</key> + <value>False</value> + </param> + <param> + <key>xy_mode</key> + <value>False</value> + </param> + <param> + <key>num_inputs</key> + <value>1</value> + </param> + <param> + <key>win_size</key> + <value></value> + </param> + <param> + <key>grid_pos</key> + <value></value> + </param> + <param> + <key>notebook</key> + <value></value> + </param> + <param> + <key>trig_mode</key> + <value>wxgui.TRIG_MODE_AUTO</value> + </param> + <param> + <key>y_axis_label</key> + <value>Counts</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(587, 307)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>zeromq_rep_sink</key> + <param> + <key>id</key> + <value>zeromq_rep_sink_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>address</key> + <value>tcp://*:5555</value> + </param> + <param> + <key>timeout</key> + <value>0.1</value> + </param> + <param> + <key>blocking</key> + <value>True</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(732, 92)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <block> + <key>zeromq_req_source</key> + <param> + <key>id</key> + <value>zeromq_req_source_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>complex</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>address</key> + <value>tcp://localhost:5555</value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> + <key>_coordinate</key> + <value>(277, 327)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> + <connection> + <source_block_id>analog_sig_source_x_0</source_block_id> + <sink_block_id>blocks_add_xx_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>analog_fastnoise_source_x_0</source_block_id> + <sink_block_id>blocks_add_xx_0</sink_block_id> + <source_key>0</source_key> + <sink_key>1</sink_key> + </connection> + <connection> + <source_block_id>blocks_add_xx_0</source_block_id> + <sink_block_id>blocks_throttle_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>blocks_throttle_0</source_block_id> + <sink_block_id>zeromq_rep_sink_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> + <connection> + <source_block_id>zeromq_req_source_0</source_block_id> + <sink_block_id>wxgui_scopesink2_0</sink_block_id> + <source_key>0</source_key> + <sink_key>0</sink_key> + </connection> +</flow_graph> diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h index 7c5734ee12..46ad80b0b3 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h @@ -49,10 +49,11 @@ namespace gr { * \brief Return a shared_ptr to a new instance of zeromq::pub_sink. * * \param itemsize Size of a stream item in bytes + * \param vlen Vector length of the input items. Note that one vector is one item. * \param address ZMQ socket address specifier * \param blocking Indicate whether blocking sends should be used, default true. */ - static sptr make(size_t itemsize, char *address, bool blocking=true); + static sptr make(size_t itemsize, size_t vlen, char *address, bool blocking=true); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_source.h index 12c1005174..4306931e6e 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h @@ -46,6 +46,7 @@ namespace gr { * \brief Return a shared_ptr to a new instance of gr::zeromq::pull_source. * * \param itemsize Size of a stream item in bytes + * \param vlen Vector length of the input items. Note that one vector is one item. * \param address ZMQ socket address specifier * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h index 2f68e44ca0..46ad6a4863 100644 --- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h @@ -50,6 +50,7 @@ namespace gr { * \brief Return a shared_ptr to a new instance of gr::zeromq::push_sink * * \param itemsize Size of a stream item in bytes + * \param vlen Vector length of the input items. Note that one vector is one item. * \param address ZMQ socket address specifier * \param blocking Indicate whether blocking sends should be used, default true. * diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h index 9c2cafba12..374607e4b6 100644 --- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h @@ -48,6 +48,7 @@ namespace gr { * \brief Return a shared_ptr to a new instance of zeromq::rep_sink. * * \param itemsize Size of a stream item in bytes + * \param vlen Vector length of the input items. Note that one vector is one item. * \param address ZMQ socket address specifier * \param timeout Timeout for request poll, in seconds * \param blocking Indicate whether blocking sends should be used, default true. diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h b/gr-zeromq/include/gnuradio/zeromq/req_source.h index 5d3f380f8b..5fc3682241 100644 --- a/gr-zeromq/include/gnuradio/zeromq/req_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h @@ -47,7 +47,7 @@ namespace gr { * * * \param itemsize Size of a stream item in bytes - * \param vlen of the input items. + * \param vlen Vector length of the input items. Note that one vector is one item. * \param address ZMQ socket address specifier * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 086e995b2e..07002395f5 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -31,17 +31,17 @@ namespace gr { namespace zeromq { pub_sink::sptr - pub_sink::make(size_t itemsize, char *address, bool blocking) + pub_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking) { return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, address, blocking)); + (new pub_sink_impl(itemsize, vlen, address, blocking)); } - pub_sink_impl::pub_sink_impl(size_t itemsize, char *address, bool blocking) + pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking) : gr::sync_block("pub_sink", - gr::io_signature::make(1, 1, itemsize), + gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_blocking(blocking) + d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking) { d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); @@ -62,8 +62,8 @@ namespace gr { const char *in = (const char *)input_items[0]; // create message copy and send - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index f314a756cd..8e6fac7485 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -33,12 +33,13 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; public: - pub_sink_impl(size_t itemsize, char *address, bool blocking); + pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking); ~pub_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 9b9e50a38f..b29a056d84 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("pull_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); @@ -71,21 +71,22 @@ namespace gr { // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); + // Receive data + zmq::message_t msg; + std::cout << "pull before" << std::endl; + d_socket->recv(&msg); + std::cout << "pull after" << std::endl; + // Copy to ouput buffer and return + if (msg.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items); - // Copy to ouput buffer and return - if (msg.size() >= d_itemsize*noutput_items) { - memcpy(out, (void *)msg.data(), d_itemsize*noutput_items); + return noutput_items; + } + else { + memcpy(out, (void *)msg.data(), msg.size()); - return noutput_items; - } - else { - memcpy(out, (void *)msg.data(), msg.size()); - - return msg.size()/d_itemsize; - } + return msg.size()/(d_itemsize*d_vlen); + } } else { return 0; // FIXME: someday when the scheduler does all the poll/selects diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h index 6e3e686689..5140dc54bb 100644 --- a/gr-zeromq/lib/pull_source_impl.h +++ b/gr-zeromq/lib/pull_source_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; int d_timeout; // microseconds, -1 is blocking zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index 43ccd9dbca..1438e524a5 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("push_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_blocking = blocking; d_context = new zmq::context_t(1); @@ -63,10 +63,11 @@ namespace gr { const char *in = (const char *) input_items[0]; // create message copy and send - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); + std::cout << "before" << std::endl; d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); - + std::cout << "after" << std::endl; return noutput_items; } diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index f1a06433e5..a34bb28a70 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 38bb7d779e..7e5f43df4c 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_blocking(blocking) + d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking) { d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); @@ -74,15 +74,15 @@ namespace gr { // create message copy and send if (noutput_items < req_output_items) { - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; } else { - zmq::message_t msg(d_itemsize*req_output_items); - memcpy((void *)msg.data(), in, d_itemsize*req_output_items); + zmq::message_t msg(d_itemsize*d_vlen*req_output_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return req_output_items; diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index 299ad06f2b..a933d94fa6 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index ffe31f9953..8dedf6c6af 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("req_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); @@ -56,8 +56,8 @@ namespace gr { int req_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) { char *out = (char*)output_items[0]; @@ -66,10 +66,10 @@ namespace gr { // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable - zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &noutput_items, sizeof(int)); - d_socket->send(request); + // Request data, FIXME non portable + zmq::message_t request(sizeof(int)); + memcpy ((void *) request.data (), &noutput_items, sizeof(int)); + d_socket->send(request); } zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; @@ -77,14 +77,13 @@ namespace gr { // If we got a reply, process if (itemsin[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t reply; - d_socket->recv(&reply); + // Receive data + zmq::message_t reply; + d_socket->recv(&reply); - // Copy to ouput buffer and return - memcpy(out, (void *)reply.data(), reply.size()); - - return reply.size()/d_itemsize; + // Copy to ouput buffer and return + memcpy(out, (void *)reply.data(), reply.size()); + return reply.size()/(d_itemsize*d_vlen); } return 0; diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h index 19cab164ac..c906e9137d 100644 --- a/gr-zeromq/lib/req_source_impl.h +++ b/gr-zeromq/lib/req_source_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py b/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py new file mode 100755 index 0000000000..72c024e6a2 --- /dev/null +++ b/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 202013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks, zeromq +from gnuradio import eng_notation + +class qa_zeromq_pushpull (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + src_data = [1,2,3,4,5,6,7,8,9]*100 + print src_data + src = blocks.vector_source_c(src_data, False, 1) + zeromq_push_sink = zeromq.push_sink(gr.sizeof_gr_complex, 1, "tcp://*:5555", True) + zeromq_pull_source = zeromq.pull_source(gr.sizeof_gr_complex, 1, "tcp://localhost:5555", 0.1) + sink = blocks.vector_sink_c() +# print sink.data + self.tb.connect(src, zeromq_push_sink) + self.tb.connect(zeromq_pull_source, sink) + self.tb.run () + self.assertEqual(sink.data(), src_data) + +if __name__ == '__main__': + gr_unittest.run(qa_zeromq_pushpull, "qa_zeromq_pushpull.xml") diff --git a/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py b/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py new file mode 100755 index 0000000000..e52047253d --- /dev/null +++ b/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 202013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks, zeromq +from gnuradio import eng_notation + +class qa_zeromq_reqrep (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + src_data = [1,2,3,4,5,6,7,8,9]*100 + print src_data + src = blocks.vector_source_c(src_data, False, 1) + zeromq_rep_sink = zeromq.rep_sink(gr.sizeof_gr_complex, 1, "tcp://*:5555", 0.1, True) + zeromq_req_source = zeromq.req_source(gr.sizeof_gr_complex, 1, "tcp://localhost:5555") + sink = blocks.vector_sink_c() +# print sink.data + self.tb.connect(src, zeromq_rep_sink) + self.tb.connect(zeromq_req_source, sink) + self.tb.run () + self.assertEqual(sink.data(), src_data) + +if __name__ == '__main__': + gr_unittest.run(qa_zeromq_reqrep, "qa_zeromq_reqrep.xml") |