Joseph Mullins

Programmer | Security | Automation

joseph@ropeney.com

ActiveMQ brokering between Ruby and Java

Back

A collection of Letter Boxes

 

It's a constant challenge for software houses to be able to keep on top of which frameworks are best and what language the cool things are being built in. It almost feels like every year the language you must use has changed. For long term companies this can be hard to be able to leverage. This is where a message queue can become invaluable. By designing your applications to use a message queue, it makes integrating the tools of multiple languages super simple. The message broker provides a common medium to allow all the applications to talk to each other, without caring for what language they were built in. Adopting a message queue allows you to seemlessly integrate your Wordpress front end with your Ruby on Rails while having your SMS notifications go through your Java service. 

ActiveMQ

ActiveMQ is a widely popular message queue. It is written in the popular, powerful Java programming language. Getting started is extremely easy and it provides libraries for most popular languages. The following will include an extremely simple way to demonstrate the use of a message queue. The code expands on the well documented tutorials provided by ActiveMQ.

Installation

If you've experienced using Elasticsearch, another popular Java platform, then you will have no issues getting ActiveMQ setup in a delopment environment.

Java Client

We will starting by using the ActiveMQ Hello World tutorial to create a simple example of a Java program that sends to the queue and takes a message off the queue. This will show that our setup is working and explain the basic principles.

package com.company;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;

import javax.jms.*;

public class Main {

    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        HelloWorldConsumer consumer = new HelloWorldConsumer();

        Thread threadProducer = new Thread(producer);
        threadProducer.start();

        Thread threadConsumer = new Thread(consumer);
        threadConsumer.start();
    }

    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "tcp://localhost:61616");

                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();

                // Create a Session
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("WelcomeQueue");

                // Create a MessageProducer from the Session to the Topic or
                // Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                // Create a messages
                String text = "Hello world! From: "
                        + Thread.currentThread().getName() + " : "
                        + this.hashCode();
                TextMessage message = session.createTextMessage(text);

                // Tell the producer to send the message
                System.out.println("Sent message: " + message.hashCode()
                        + " : " + Thread.currentThread().getName());
                producer.send(message);

                // Clean up
                session.close();
                connection.close();

            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "tcp://localhost:61616");

                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();

                connection.setExceptionListener(this);

                // Create a Session
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("WelcomeQueue");

                // Create a MessageConsumer from the Session to the Topic or
                // Queue
                MessageConsumer consumer = session.createConsumer(destination);
              
                Message message = consumer.receive(5000);

                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
                
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}

Your thread ID's should be different, but the basic output should be similar

Sent message: 604068378 : Thread-0
Received: Hello world! From: Thread-0 : 106172800

Ruby Publisher

Now we will make a simple modification so that we can receive ByteMessages, we have to be able to take ByteMessages since Ruby's gem Stomp can only send ByteMessage as it's types are dynamic.

package com.company;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;


import javax.jms.*;

public class Main {

    public static void main(String[] args) {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        Thread threadConsumer = new Thread(consumer);
        threadConsumer.start();
    }

    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "tcp://localhost:61616");

                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();

                connection.setExceptionListener(this);

                // Create a Session
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("WelcomeQueue");

                // Create a MessageConsumer from the Session to the Topic or
                // Queue
                MessageConsumer consumer = session.createConsumer(destination);

                // Just keep looping to try reading up to 10 times.
                for(int i = 0; i < 100; i++) {
                    Message message = consumer.receive(5000);

                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println("Received: " + text);
                    } else if (message instanceof ActiveMQBytesMessage) {
                        ActiveMQBytesMessage textMessage = (ActiveMQBytesMessage) message;
                        byte[] b = new byte[(int) textMessage.getBodyLength()];
                        textMessage.readBytes(b);
                        System.out.println("Received: " + new String(b));
                    }
                    else {
                        System.out.println("Received: " + message);
                    }
                }

                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}

Now we can make a simple Ruby script to illustrate how easy it is to send a message to the queue and be consumed by the Java application. You will need to install the Stomp gem gem install stomp

require 'stomp'

conn = Stomp::Client.new("localhost", 61613)

conn.publish '/queue/WelcomeQueue', 'Hello From Ruby'

You should now get a message similar

Received: null
Received: Hello From Ruby
Received: null

Finishing Notes

An example application that might make use of this service would be adding SMS functionality to your system. You might create a Rails app where people create an account, this would then send a message to the message queue that an SMS has to be sent; containing the phone number and message. An SMS application, possibly written in another language, would be sitting subscribed to the message queue and trigger creating an SMS once the message has been published. Tomorrow when you replace the SMS application with your favourite NodeJS implementation, you don't need to change any application using it.

Comments