I became acquainted with ZeroMQ on a pretty involved networking messaging project last year. I’ve since started thinking of using it for smaller applications also.
Take this for example.
#!/usr/bin/perl -Tw
# file: sender.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREQ );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_XREQ );
$skt->bind("tcp://*:70000");
while (1) {
$skt->send("hello world");
sleep 1;
}
#!/usr/bin/perl -Tw
# file: receiver.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREP );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_XREP );
$skt->connect("tcp://*:70000");
while (1) {
my $sender_id = $skt->recv();
my $msg = $skt->recv();
print "msg: ", $msg->data(), "\n";
}
Running this example we see what we expect:
dylan@doxey.org$: ./receiver.pl
msg: hello world
msg: hello world
msg: hello world
msg: hello world
msg: hello world
msg: hello world
...
But there are some properties of this ZMQ connection which are not evident here.
• The call to send(…) blocks until the receiver as made its call to recv().
• When there are multiple receivers the messages will be round-robin distributed among the receiver instances.
To help illustrate these points I’m going to add some extra clutter to make each message distinct.
#!/usr/bin/perl -Tw
# file: sender.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREQ );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_XREQ );
$skt->bind("tcp://*:70000");
my $n = 0;
while (1) {
$skt->send( time . ':' . $n++ . ': hello world' );
sleep 1;
}
I’ll start sender.pl, wait a few seconds and then start two instances of receiver.pl.
dylan@doxey.org$: ./receiver.pl
msg: 1363457771:0: hello world
msg: 1363457777:1: hello world
msg: 1363457778:2: hello world
msg: 1363457779:3: hello world
msg: 1363457781:5: hello world
msg: 1363457783:7: hello world
...
|
dylan@doxey.org$: ./receiver.pl
msg: 1363457780:4: hello world
msg: 1363457782:6: hello world
msg: 1363457784:8: hello world
...
|
Here you can see that sender.pl sent message 0 at 1363457771 and then blocked, waiting for the receiver.pl to come online and invoke recv() at 1363457776. Then the one second interval is apparent until the second instance of receive.pl comes online which then exhibits the round robin message distribution starting at message 4.
I would like to eliminate the blocking. I prefer that sender.pl just does a fire & forget.
#!/usr/bin/perl -Tw
# file: sender.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREQ ZMQ_NOBLOCK );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_XREQ );
$skt->bind("tcp://*:70000");
my $n = 0;
while (1) {
$skt->send( time . ':' . $n++ . ': hello world', ZMQ_NOBLOCK );
sleep 1;
}
Again repeating the test …
dylan@doxey.org$: ./receiver.pl
msg: 1363466086:10: hello world
msg: 1363466087:11: hello world
msg: 1363466088:12: hello world
msg: 1363466090:14: hello world
|
dylan@doxey.org$: ./receiver.pl
msg: 1363466089:13: hello world
msg: 1363466091:15: hello world
msg: 1363466093:17: hello world
|
In this case we can see that messages 0 through 9 were lost before I started the first instance of receive.pl. Upon starting the second instance of receive.pl we can see the round robin message distribution resume.
Next I’d like to make a tweak which eliminates the round robin distribution of messages and instead sends the messages simultaneously to all instances of receive.pl equally — in a true pub-sub model. The examples in Perl are a little spotty. Here’s a minimal ZMQ pubsub in Perl.
#!/usr/bin/perl -Tw
# publish.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_PUB );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_PUB );
$skt->bind("tcp://*:70000");
my $n = 0;
while (1) {
$skt->send( time . ':' . $n++ . ': hello world' );
sleep 1;
}
#!/usr/bin/perl -Tw
# subscribe.pl
use strict;
use warnings;
use ZeroMQ qw( ZMQ_SUB ZMQ_SUBSCRIBE );
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket( ZMQ_SUB );
$skt->setsockopt( ZMQ_SUBSCRIBE, "" );
$skt->connect("tcp://*:70000");
while (1) {
my $msg = $skt->recv();
print "msg: ", $msg->data(), "\n";
sleep 2;
}
The obvious change here is the use of the socket types ZMQ_PUB & ZMQ_SUB. The one that didn’t come easily was the use of the ZMQ_SUBSCRIBE socket option for subscribe.pl. (The empty string value for the ZMQ_SUBSCRIBE tells ZMQ that I’m interested in subscribing all messages published for that socket. I could put in a non-empty string value and that would instruct ZMQ to filter any messages which don’t begin with that prefix.) If you omit the ZMQ_SUBSCRIBE option then ZMQ has no criterion to use for filtering the messages and that socket will receive nothing. Nice.
Running this example gives use the results we’d expect.
dylan@doxey.org$: ./subscribe.pl
msg: 1363470603:57: hello world
msg: 1363470604:58: hello world
msg: 1363470605:59: hello world
msg: 1363470606:60: hello world
msg: 1363470607:61: hello world
...
|
dylan@doxey.org$: ./subscribe.pl
msg: 1363470605:59: hello world
msg: 1363470606:60: hello world
msg: 1363470607:61: hello world
msg: 1363470608:62: hello world
msg: 1363470609:63: hello world
...
|
Consider this scenario …
my $failed = system "program.pl &";
die 'failed to start program.pl'
if $failed;
Given the trailing & this program will be running on its own and you have no access to it’s STDIN, STDOUT or exit status. All we know is that the system call either succeeded or failed to initiate the process.
Let’s try putting this to use.
Imagine a scenario where you have a program which wants to dispatch jobs as separate processes. After issuing the jobs the dispatcher would like to get status reports from the job processes so that it can determine when all the jobs have completed successfully. (This is a fan-in pattern, where many senders are writing to a single receiver. This is the reverse of the pubsub fan-out pattern.)
Here’s an example of a job handler which does a little something.
#!/usr/bin/perl -Tw
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREQ ZMQ_IDENTITY );
my ($id) = @ARGV;
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket(ZMQ_XREQ);
$skt->setsockopt( ZMQ_IDENTITY, $id );
$skt->connect('tcp://*:70000');
for my $i ( ord 0 .. ord $id, 'done' ) {
$skt->send("status: $i");
}
$skt->close();
This program demonstrates how we use the ZMQ_IDENTITY socket option. The value used as the socket identity is passed as the first section of the multipart message that an ZMQ_XREQ socket sends.
Here’s a job dispatcher.
#!/usr/bin/perl -Tw
use strict;
use warnings;
use ZeroMQ qw( ZMQ_XREP );
my $job_count = 0;
#
# Start Jobs
#
for my $id ( 'A' .. 'Z' ) {
my $command = "perl -Tw job.pl $id &";
{
local $ENV{PATH} = '/usr/bin';
system $command
and die "failed: $command";
}
$job_count++;
}
my $ctx = ZeroMQ::Context->new();
my $skt = $ctx->socket(ZMQ_XREP);
$skt->bind('tcp://*:70000');
my %is_done;
#
# Monitor Jobs
#
while ( keys %is_done < $job_count ) {
my $sender_id = $skt->recv()->data();
my $message = $skt->recv()->data();
print "$sender_id: $message\n";
if ( $message =~ m{: \s done \z}xms ) {
$is_done{$sender_id} = 1;
}
}
print "All jobs are done!\n";
This program issues the job processes as system calls. Then it creates an ZMQ_XREP socket to listen for status updates. The messages are queued and delivered in the order which they were sent. There is no blocking. But the final ‘done’ status message might be at the back of the queue giving the false impression that the job is still running if the dispatcher program is not dequeuing messages as fast as their queuing up.
Further reading:
http://api.zeromq.org/2-1:zmq-socket
http://api.zeromq.org/2-1:zmq-setsockopt
Unfortunately, ZeroMQ (http://search.cpan.org/~dmaki/ZeroMQ-0.23/lib/ZeroMQ.pm) is noted as deprecated. This is what apt-get installs on Ubuntu 12.04. But I suppose it’s time to move on to ZMQ::LibZMQ (http://search.cpan.org/~dmaki/ZMQ-LibZMQ3-1.10/).