You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Introduction

This document describes how messages can be sent to and received from a messaging queue and how those messages can be processed in a Java class to add an order to a JobScheduler job chain.

No JobScheduler libraries are needed for this example.

Mode Of Operation

  • Creating a message producer to send messages to a Message Queue server (MQ server).
  • Creating a message consumer to receive messages from an MQ server.
  • Sending the message to a Jobscheduler.

The message has to be an XML snippet which is valid to the JobScheduler XML Schema to be processed on the desired JobScheduler.

Most of the implementation was done with the standard jms implementation of Java. The only class from the active MQ implementation is the ActiveMQConnectionFactory class. It shouldn´t be too complex, to change the implementation with the ConnectionFactory of your desired message queue service.

The MessageProducer

This example describes how to build a connection to an MQ server and send a message to a queue on this server. This example uses Apache Active MQ as the MQ server.

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a producer.
  5. Send a message with the producer.
  6. Close the connection.

The Methods

To make the implementation more readable as well as reusable, the steps are separated in different methods. 

The createConnection(String url) method

This method instantiates a ConnectionFactory object with an ActiveMQConnectionFactory object and creates a Connection object through the factory method createConnection().

The ActiveMQConnectionFactory object has to be instantiated with the URL of the MQ server.

createConnection(String url)
public Connection createConnection(String uri){
    ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
    Connection connection = null;
    try {
        connection = factory.createConnection();
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to connect: " , e);
    }
    return connection;
}

The createSession(Connection connection) method

The method is called with an already instantiated Connection object and initiates a Session object through the Connection objects createSession(boolean transacted,  int acknowledgeMode) method.

createSession(Connection connection)
private Session createSession(Connection connection){
    Session session = null;
    try {
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to create Session: " , e);
    }
    return session;
}

The createDestination(Session session, String queueName) method

This method creates a Destination object. It is called with an active Session object and the name of the queue to write to. The Destination object is initiated through the createQueue(String name) method of the Session object.

createDestination(Session session)
public Destination createDestination(Session session, String queueName){
    Destination destination = null;
    try {
        destination = session.createQueue(queueName);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to create Destination: " , e);
    }
    return destination;
}

The createMessageProducer(Session session, Destination destination) method

This method is called with an already active Session object as well as instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.

createProducer(Session session, Destination destination)
private MessageProducer createMessageProducer(Session session, Destination destination){
    MessageProducer producer = null;
    try {
        producer = session.createProducer(destination);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
    }        
    return producer;
}

The write(String text, MessageProducer producer) method

The method is called with the message to send to the server and the MessageProducer object to use for publishing. The message is a String object. The method instantiates a Message object with the text to send.

write(String text)
public void write(String text, MessageProducer producer){
    Message message = null;
    try {
        if(text != null){
            message = session.createTextMessage(text);
        } else{
            message = session.createTextMessage(DEFAULT_TEXT);
        }
        producer.send(message);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
    }
}

The close() method

This methods makes sure that the used connection will be closed after a message is sent.

close()
private void close(Connection connection){
    if (connection != null) {
        try {
            connection.close();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to close the connection: " , e);
        }
    }
}

The SOSProducer class

The code example below slightly differs from the examples above. In the class below the methode write(String text) already uses the other method for instantiation, therfore it needs less parameters and it closes the connection itself.

The text in the example below consists of a JobScheduler add_order XML, which can later be used to send to a JobScheduler.

SOSProducer.java
package com.sos.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSProducer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
    private static final String QUEUE_NAME = "test_queue";
    private static final String DEFAULT_TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>"
            + "<params>"
            + "<param name='host' value='[HOST]'/>"
            + "<param name='port' value='22'/>"
            + "<param name='user' value='[USERNAME]'/>"
            + "<param name='auth_method' value='password'/>"
            + "<param name='password' value='[PASSWORD]'/>"
            + "<param name='command' value='echo command send over MessageQueue!'/>"
            + "</params>"
            + "</add_order>";
    
    private Connection createConnection(){
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }
    
    public Connection createConnection(String uri){
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = null;
        try {
            connection = factory.createConnection();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to connect: " , e);
        }
        return connection;
    }
    
    private Session createSession(Connection connection){
        Session session = null;
        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Session: " , e);
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName){
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Destination: " , e);
        }
        return destination;
    }
    
    private MessageProducer createMessageProducer(Session session, Destination destination){
        MessageProducer producer = null;
        try {
            producer = session.createProducer(destination);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
        }        
        return producer;
    }
    
    public void write(String text){
        Connection connection = createConnection();
        Session session = createSession(connection);
        Destination destination = createDestination(session);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(DEFAULT_TEXT);
            }
            producer.send(message);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.error("JMSException occurred while trying to close the connection: " , e);
                }
            }
        }
    }
}

The MessageConsumer

This example describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
This example uses Apache Active MQ as the MQ server.

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a consumer.
  5. Receive a message with the consumer.
  6. Close the (MQ) connection.
  7. Open a TCP connection to a JobScheduler instance.
  8. Send the Message to the JobScheduler
  9. Close the (TCP) connection.

This part of the document will show examples for the last 5 steps as the first four steps are simmilar as the examples shown above for the instantiation of the MessageProducer.

The Methods

The createMessageConsumer(Session session, Destination destination) method

This method is called with an already active Session object as well as instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

createMessageConsumer(Session session, Destination destination)
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
    MessageConsumer consumer = null;
    try {
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
    }
    return consumer;
}

The read(MessageConsumer consumer) method

read()
private String read(MessageConsumer consumer) {
    TextMessage message = null;
    String textMessage = null;
    try {
        while (true) {
            Message receivedMessage = consumer.receive(1);
            if (receivedMessage != null) {
                if (receivedMessage instanceof TextMessage) {
                    message = (TextMessage) receivedMessage;
                    textMessage = message.getText();
                    LOGGER.info("Reading message: " + textMessage);
                    break;
                } else {
                    break;
                }
            }
        }
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
    }
    return textMessage;
}

 Don´t forget to clean up (call close()) after the message has been received.

The connect(String host, int port) method

This method instantiates a TCP socket connection to a JobScheduler instance with the given host:port.

connect()
public void connect(String host, int port) throws Exception {
    if (host == null || host.length() == 0) {
       throw (new Exception("hostname missing."));
    }
    if (port == 0){
       throw (new Exception("port missing."));
    }
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
        udpSocket = new DatagramSocket();
        udpSocket.connect(InetAddress.getByName(HOST), PORT);
    } else {
        socket = new Socket(host, port);
        in = new DataInputStream(socket.getInputStream());
        out = new PrintWriter(socket.getOutputStream(), true);
    }
}

The disconnect() method

This method is used to close the socket opened with the above connect() method.

disconnect()
public void disconnect() throws Exception {
    if (socket != null) {
        socket.close();
    }
    if (in != null) {
        in.close();
    }
    if (out != null) {
        out.close();
    }
}

The sendRequest(String command) method

This method sends a request with the given command to the connected JobScheduler instance. The given command in the example is the add_order XML snippets shown in the SOSProducer example.

public void sendRequest(String command) throws Exception {
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
        if (command.indexOf("<?xml") == -1) {
            command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
        }
        byte[] commandBytes = command.getBytes();
        udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
    } else {
        if (command.indexOf("<?xml") == 0) {
            out.print(command + "\r\n");
        } else {
            out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
        }
        out.flush();
    }
}

The getResponse() method

THis message receives the response of the tcp connection described above.

public String getResponse() throws IOException, RuntimeException {
    int sec = 0;
    byte[] buffer = {};
    while (in.available() == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        if (sec++ == TIMEOUT) {
            throw new RuntimeException("timeout reached");
        }
    }
    buffer = new byte[in.available()];
    int bytesRead = in.read(buffer);
    return new String(buffer, "ISO-8859-1");
}

The receiveFromQueue() method

The receiveFromQueue() method uses the methods described above to connect to a Host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.

receiveFromQueue()
public String receiveFromQueue() {
    String message = null;
    try {
        connect();
        message = read();
        sendRequest(message);
        disconnect();
    } catch (Exception e) {
        LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
    }
    return message;
}

The SOSConsumer class

The code example below slightly differs from the examples above. In the class below the methode read() already uses the other method for instantiation, therfore it needs less parameters and it closes the connection itself.

SOSConsumer
package com.sos.jms.consumer;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSConsumer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOST]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class);
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private static final String HOST = "[JobScheduler_HOSTNAME]";
    private static final int PORT = [JobScheduler_PORT];
    private static final String PROTOCOL = "tcp";
    private static final int TIMEOUT = 5;
    private Socket socket = null;
    private DatagramSocket udpSocket = null;
    private DataInputStream in = null;
    private PrintWriter out = null;

    private Connection createConnection() {
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }

    private Connection createConnection(String uri) {
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = null;
        try {
            connection = factory.createConnection();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to connect: ", e);
        }
        return connection;
    }

    private Session createSession(Connection connection) {
        Session session = null;
        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Session: ", e);
        }
        return session;
    }

    private Destination createDestination(Session session) {
        return this.createDestination(session, DEFAULT_QUEUE_NAME);
    }
    private Destination createDestination(Session session, String queueName) {
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Destination: ", e);
        }
        return destination;
    }

    private MessageConsumer createMessageConsumer(Session session, Destination destination) {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
        }
        return consumer;
    }

    private String read() {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = createConnection();
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.error("JMSException occurred while trying to close the connection: " , e);
                }
            }
         }
        return textMessage;
    }

    public String receiveFromQueue() {
        String message = null;
        try {
            connect();
            message = read();
            sendRequest(message);
            disconnect();
        } catch (Exception e) {
            LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
        }
        return message;
    }

    public void connect(String host, int port) throws Exception {
        if (host == null || host.length() == 0) {
            throw (new Exception("hostname missing."));
        }
        if (port == 0){
            throw (new Exception("port missing."));
        }
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            udpSocket = new DatagramSocket();
            udpSocket.connect(InetAddress.getByName(HOST), PORT);
        } else {
            socket = new Socket(host, port);
            in = new DataInputStream(socket.getInputStream());
            out = new PrintWriter(socket.getOutputStream(), true);
        }
    }

    public void connect() throws Exception {
        this.connect(HOST, PORT);
    }

    /** sends a request to a JobScheduler
     *
     * @param command
     * @throws java.lang.Exception 
     **/
    public void sendRequest(String command) throws Exception {
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            if (command.indexOf("<?xml") == -1) {
                command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
            }
            byte[] commandBytes = command.getBytes();
            udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
        } else {
            if (command.indexOf("<?xml") == 0) {
                out.print(command + "\r\n");
            } else {
                out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
            }
            out.flush();
        }
    }

    /** receives the response of the JobScheduler for the last sent request
     *
     * @return String response from JobScheduler
     * @throws IOException */
    public String getResponse() throws IOException, RuntimeException {
        int sec = 0;
        byte[] buffer = {};
        while (in.available() == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LOGGER.error("", e);
            }
            if (sec++ == TIMEOUT) {
                throw new RuntimeException("timeout reached");
            }
        }
        buffer = new byte[in.available()];
        int bytesRead = in.read(buffer);
        return new String(buffer, "ISO-8859-1");
    }

    public void disconnect() throws Exception {
        if (socket != null) {
            socket.close();
        }
        if (in != null) {
            in.close();
        }
        if (out != null) {
            out.close();
        }
    }
}
  • No labels