GSP
Quick Navigator

Search Site

Unix VPS
A - Starter
B - Basic
C - Preferred
D - Commercial
MPS - Dedicated
Previous VPSs
* Sign Up! *

Support
Contact Us
Online Help
Handbooks
Domain Status
Man Pages

FAQ
Virtual Servers
Pricing
Billing
Technical

Network
Facilities
Connectivity
Topology Map

Miscellaneous
Server Agreement
Year 2038
Credits
 

USA Flag

 

 

Man Pages
Net::STOMP::Client::Tutorial(3) User Contributed Perl Documentation Net::STOMP::Client::Tutorial(3)

Net::STOMP::Client::Tutorial - Getting started with STOMP and Net::STOMP::Client

Here is a five-parts tutorial to get started with Net::STOMP::Client.

A basic knowledge of STOMP is required. For this, you can read:

  • <http://stomp.github.com/stomp-specification-1.2.html>

    the STOMP 1.2 protocol specification

  • <http://fusesource.com/docs/broker/5.5/connectivity_guide/FMBConnectivityStompTelnet.html>

    the Fuse Message Broker STOMP Tutorial

Net::STOMP::Client, like similar modules under the Net::* hierarchy, provides an object oriented interface to a network protocol.

In order to connect to a broker, you first have to create an object. This object will later be used to interact with the broker. When the module creates the object, it tries to connect to the broker using either plain TCP or SSL. Nothing is done at the STOMP level.

To create the object, you of course need to specify where to connect to. This can be done either with a single "uri" parameter:

  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");

or with the pair of "host" and "port" parameters:

  $stomp = Net::STOMP::Client->new(
      host => "mybroker",
      port => 6163,
  );

Using SSL is more complex since more parameters have to be given. Note: IO::Socket::SSL is used behind the scene so you can refer to its documentation for more information. Here is a complete example with certificate based authentication:

  $stomp = Net::STOMP::Client->new(
      uri => "stomp+ssl://mybroker:6162",
      sockopts => {
          # path of the directory containing trusted certificates
          SSL_ca_path     => "/etc/ssl/ca",
          # client certificate to present
          SSL_cert_file   => "/etc/ssl/client-cert.pem",
          # client private key
          SSL_key_file    => "/etc/ssl/client-key.pem",
          # passphrase of the client private key
          SSL_passwd_cb   => sub { return("secret") },
          # verify mode (SSL_VERIFY_PEER)
          SSL_verify_mode => 1,
      },
  );

Note: recent versions of IO::Socket::SSL issue a warning if the verify mode is not explicitly set.

Once connected, at TCP or SSL level, you can get information about the broker using the "peer" method. For instance:

  $peer = $stomp->peer();
  printf("connected to broker %s (IP %s), port %d\n",
      $peer->host(), $peer->addr(), $peer->port());

After creating the broker object, you must start a STOMP session by sending a "CONNECT" frame. This is as simple as:

  $stomp->connect();

If authentication is required, you must pass extra information at this stage. For instance with:

  $stomp->connect(
      login    => "guest",
      passcode => "welcome",
  );

In case the broker uses virtual hosts, you can add a "host" parameter to the "connect" method:

  $stomp->connect(
      login    => "guest",
      passcode => "welcome",
      host     => "/",
  );

If it is not given explicitly, the "host" parameter defaults to whatever has been given in the "new" method, via the "host" or "uri" parameter.

At this point, the session has been established and you can now send and/or receive messages.

Once connected at STOMP level, you can get information about the broker using the "server" and "version" methods. For instance:

  printf("speaking STOMP %s with server %s\n",
      $stomp->version(), $stomp->server() || "UNKNOWN");

When you are done with messaging, you should gracefully end the session by sending a "DISCONNECT" frame with:

  $stomp->disconnect();

Note: STOMP does not support reconnection. Once the session has been ended, the broker object cannot be used anymore.

Putting all this together, here is a complete program that simply connects, starts and ends a session, printing information along the way.

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  $peer = $stomp->peer();
  printf("connected to broker %s (IP %s), port %d\n",
      $peer->host(), $peer->addr(), $peer->port());
  $stomp->connect();
  printf("speaking STOMP %s with server %s\n",
      $stomp->version(), $stomp->server() || "UNKNOWN");
  printf("session %s started\n", $stomp->session());
  $stomp->disconnect();
  printf("session ended\n");

A message is made of a header (a list of key/value pairs) and a body. Both are optional.

To send a message, you have to issue a "SEND" frame. For instance:

  $stomp->send(
      destination => "/queue/test",
      subject     => "this is a test",
      time        => time(),
      body        => "Hello world!\n",
  );

By default, you do not get any confirmation that the message has indeed been received by the broker. If you want such a confirmation, you have to use receipts. The following code snippet sends two messages with a "receipt" header containing a pseudo-unique id and waits for matching "RECEIPT" frames coming from the broker. This is easy because the Net::STOMP::Client module keeps track of which receipts are expected and have not been received yet.

  $stomp->send(
      destination => "/queue/test",
      body        => "Test of receipts 1...\n",
      receipt     => $stomp->uuid(),
  );
  $stomp->send(
      destination => "/queue/test",
      body        => "Test of receipts 2...\n",
      receipt     => $stomp->uuid(),
  );
  # wait at most 10 seconds for all pending receipts
  $stomp->wait_for_receipts(timeout => 10);
  # complain if some receipts are still pending
  die("Receipt not received!\n") if $stomp->receipts();

Note: all STOMP frames can carry a "receipt" header so this is not restricted to message sending.

In addition, you can use transactions to group the sending of several messages so that either none or all of them get handled by the broker.

  # create a unique transaction id
  $tid = $stomp->uuid();
  # begin the transaction
  $stomp->begin(transaction => $tid);
  # send two messages as part of this transaction
  $stomp->send(
      destination => "/queue/test1",
      body        => "message 1",
      transaction => $tid,
  );
  $stomp->send(
      destination => "/queue/test2",
      body        => "message 2",
      transaction => $tid,
  );
  # either abort or commit
  if (... something bad happened...) {
      # abort/rollback the transaction
      $stomp->abort(transaction => $tid);
      # no messages have been delivered to the broker
  } else {
      # commit the transaction
      $stomp->commit(transaction => $tid);
      # both messages have been delivered to the broker
  }

In order to receive frames, you first have to subscribe to one or more destinations. This is as easy as:

  $stomp->subscribe(destination => "/queue/test");

When you are done, you simply unsubscribe with:

  $stomp->unsubscribe(destination => "/queue/test");

It is good practice to add an "id" header to uniquely identify the subscription. All messages part of this subscription will have a matching "subscription" header. This "id" can also be used to unsubscribe.

In fact, the code above only works with STOMP 1.0. In STOMP 1.1 and above, the "id" header has been made mandatory so you must use something like:

  $stomp->subscribe(
      destination => "/queue/test",
      id          => "testsub",
  );
  # received messages will contain: subscription:testsub
  $stomp->unsubscribe(id => "testsub");

While you are subscribed to some destinations, the broker may decide at any time to send you "MESSAGE" frames. You can process these frames with a simple loop:

  while ($frame = $stomp->wait_for_frames()) {
      # ... do something with the received frame ...
  }

The code above is blocking and will loop forever. You can add a "timeout" option to have a non-blocking loop:

  while (1) {
      # wait at most one second for a new frame
      $frame = $stomp->wait_for_frames(timeout => 1);
      # do what is appropriate
      if ($frame) {
          # ... do something with the received frame ...
      } else {
          # nothing received
      }
  }

Because of the asynchronous nature of STOMP, receiving messages is a bit tricky: you cannot know a priori which types of frames will be sent when. For instance, you may want to send messages (with receipts) while you are subscribed to some destinations and you may receive a "MESSAGE" frame while you would like to wait for a <RECEIPT> frame, or vice versa.

The "wait_for_frames" method described above will wait for any frame, not only message frames. It is up to you to check that what you receive is a "MESSAGE" frame or not. This can be done with something like:

  if ($frame->command() eq "MESSAGE") {
      # ... do something with the received message ...
  } else {
      # something else than a message frame
  }

Putting all this together, here is a complete program that receives ten messages from to "/queue/test":

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  # the next line will be explained in the next part of the tutorial ;-)
  $stomp->message_callback(sub { return(1) });
  $stomp->connect();
  $sid = $stomp->uuid();
  $stomp->subscribe(
      destination => "/queue/test",
      # we use the generated subscription id
      id          => $sid,
      # we want a receipt on our SUBSCRIBE frame
      receipt     => $stomp->uuid(),
  );
  $count = 0;
  while ($count < 10) {
      $frame = $stomp->wait_for_frames(timeout => 1);
      if ($frame) {
          if ($frame->command() eq "MESSAGE") {
              $count++;
              printf("received message %d with id %s\n",
                     $count, $frame->header("message-id"));
          } else {
              # this will catch the RECEIPT frame
              printf("%s frame received\n", $frame->command());
          }
      } else {
          print("waiting for messages...\n");
      }
  }
  $stomp->unsubscribe(id => $sid);
  $stomp->disconnect();

As seen in part 3, because of the asynchronous nature of STOMP, it is a bit tricky to properly handle all the different types of frames that can be received.

In order to simplify this, Net::STOMP::Client supports the use of callbacks. They are pieces of code called in well defined situations. In fact, there are two levels of callbacks: global and local.

Global (per command) callbacks are called each time a frame is received. Net::STOMP::Client has default callbacks that should be sufficient for all types of frames, except for "MESSAGE" frames. For these, it is really up to the coder to define what he wants to do with the received messages.

Here is an example with a message callback counting the messages received:

  $stomp->message_callback(sub {
      my($self, $frame) = @_;
      $count++;
      return($self);
  });

These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:

  $stomp->message_callback(sub { return(1) });

Here is how to re-write a simplified version of the inner part of the receiving program of part 3 with a global callback:

  $count = 0;
  sub msg_cb ($$) {
      my($self, $frame) = @_;
      $count++;
      printf("received message %d with id %s\n",
             $count, $frame->header("message-id"));
      return($self);
  }
  $stomp->message_callback(\&msg_cb);
  $stomp->wait_for_frames() while $count < 10;

Local (per invocation) callbacks are called by the "wait_for_frame" method. Their return value control what "wait_for_frame" does:
  • false: "wait_for_frame" should wait for more frames
  • true: "wait_for_frame" can stop and return this value

Here is how to use "wait_for_frames" with a local callback to wait until we receive a "MESSAGE" frame that contains "quit" in the body:

  sub msg_cb ($$) {
      my($self, $frame) = @_;
      return(0) unless $frame->command() eq "MESSAGE";
      return(0) unless $frame->body() =~ /quit/;
      return($frame);
  }
  $frame = $stomp->wait_for_frames(callback => \&msg_cb);

As you see, you can put the logic either in the global callbacks or in the local callbacks. The best practice is to have a single global message callback that does not change throughout the execution of the program and to optionally put in local callbacks what may change from one place of the program to another.

Here is how to re-write the receiving program of part 3 with a global callback only counting the number of messages and a local callback printing information:

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  $stomp->connect();
  sub msg_cb ($$) {
      my($self, $frame) = @_;
      my $cmd = $frame->command();
      if ($cmd eq "MESSAGE") {
          printf("received message %d with id %s\n",
                 $count, $frame->header("message-id"));
      } else {
          printf("%s frame received\n", $cmd);
      }
      return($frame);
  }
  $stomp->message_callback(sub { $count++ });
  $sid = $stomp->uuid();
  $stomp->subscribe(
      destination => "/queue/test",
      id          => $sid,
      receipt     => $stomp->uuid(),
  );
  $count = 0;
  while ($count < 10) {
      $stomp->wait_for_frames(
          callback => \&msg_cb,
          timeout => 1,
      ) or print("waiting for messages...\n");
  }
  $stomp->unsubscribe(id => $sid);
  $stomp->disconnect();

Unless specified otherwise, subscriptions are made in "auto" mode, meaning that a message is considered to be delivered by the broker as soon as it sends the corresponding "MESSAGE" frame. The client may not receive the frame or it could exit before processing it. This could result in message loss.

In order to avoid message loss, one can change the subscription acknowledgment mode to be "client" instead of "auto". This is an option of the "SUBSCRIBE" frame:

  $stomp->subscribe(
      destination => "/queue/test",
      id          => $sid,
      ack         => "client",
  );

In "client" mode, the client must explicitly acknowledge the messages it has successfully processed. This is achieved by sending an "ACK" frame with a "message-id" header matching the one of the received message:

  $stomp->ack("message-id" => $frame->header("message-id"));

In practice, this is more complex since the exact way to acknowledge messages changed between STOMP 1.0 (only "message-id" required), STOMP 1.1 (both "message-id" and "subscription" required) and STOMP 1.2 (only "id" required).

Net::STOMP::Client has a unique feature to ease acknowledgments: you can directly pass the frame that holds the received message and the module will properly acknowledge, it regardless of the STOMP protocol version used:

  $stomp->ack(frame => $frame);

Messages that have not been acknowledged by the end of the session will be resent by the broker.

Note that starting with STOMP 1.1 also has a "client-individual" mode. Please consult the protocol specification for more details.

The high-level methods handle one frame at a time. This can be inefficient for small frames. For instance, the "send" method will build a frame object, encode it and send it on the wire with at least one call to "syswrite", maybe for very few bytes.

The low-level methods allow you to better control this and queue messages in memory before sending them. This way, you group data and use I/O more efficiently.

Here is how to queue ten messages and send them in one go.

  foreach my $n (1 .. 10) {
      $frame = Net::STOMP::Client::Frame->new(
          command => "SEND",
          headers => { destination => "/topic/test" },
          body    => "message $n",
      );
      # simply add the frame to the outgoing queue
      $stomp->queue_frame($frame);
  }
  # no timeout given: block until all data has been sent
  $stomp->send_data();

IO::Socket::SSL, Net::STOMP::Client, Net::STOMP::Client::Connection, Net::STOMP::Client::Frame, Net::STOMP::Client::HeartBeat, Net::STOMP::Client::Peer, Net::STOMP::Client::Receipt, Net::STOMP::Client::Version.

Lionel Cons <http://cern.ch/lionel.cons>

Copyright (C) CERN 2010-2017

2017-01-31 perl v5.32.1

Search for    or go to Top of page |  Section 3 |  Main Index

Powered by GSP Visit the GSP FreeBSD Man Page Interface.
Output converted with ManDoc.