Introduction

This article describes the implementation for sending messages to a Java Message Queue Service which:

  • sends and receives messages,
  • adds an order to a JS7 workflow by a JS7 - REST Web Service API call with a JSON body extracted from a message.

The following Message Queue Service is used in this example: https://activemq.apache.org/.

Mode of Operation

The implementation includes the following steps:

  • creation of a message producer to send messages to a Message Queue Service,
  • creation of a message consumer to receive messages from a Message Queue Service,
  • using the message as part of a JS7 REST Web Service API call.

The message is assumed to be a JSON snippet which is processed by the desired JS7 REST Web Service API. This snippet must be valid JSON and compliant with requests explained in the Technical Documentation of the REST Web Service API article.

Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory class. It shouldn´t be too complex to change the implementation to the ConnectionFactory of your preferred Message Queue Service.

Download

A zip file of this example as a complete maven project implementation is available for download: js7-jms-example-project.zip.

Prerequisites

  • A Message Queue Service (the example makes use of Active MQ)
  • A JS7 Controller, Agent and JOC Cockpit.
  • Maven (required only if users want to build the example as a Maven project)

The MessageProducer

This example describes how to establish a connection to a Message Queue Service and send a message to a queue.

  1. create a connection,
  2. create a session,
  3. create a destination,
  4. create a producer,
  5. send a message with the producer,
  6. close the connection.

Methods

write(String text)

The method is called with the message to be sent to the server. The message is a String object. The method instantiates a Message object with the text to be sent.

write(String text)
    public void write(String text, String queueName, long ttl) throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);
            // 5 sec time to live for the producer for this showcase
            producer.setTimeToLive(ttl);
            Message message = null;
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(TEXT);
            }
            producer.send(message);
        } catch (Throwable e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: ");
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the session: ", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
                }
            }
        }
    }

The SOSProducer class

The code example below shows the complete class. The code creates a TEXT message consisting of the body for a JS7 /orders/add API request.

SOSProducer.java
package com.sos.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;


public class SOSProducer {

    private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
    private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private String uri;
    
    public SOSProducer(String uri) {
        this.uri = uri;
    }
    
    public void write(String text, String queueName) throws Exception {
        write(text, queueName, 5000L);
    }
    
    public void write(String text, String queueName, long ttl) throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);
            // 5 sec time to live for the producer for this showcase
            producer.setTimeToLive(ttl);
            Message message = null;
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(TEXT);
            }
            producer.send(message);
        } catch (Throwable e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: ");
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the session: ", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
                }
            }
        }
    }
    
}

The MessageConsumer

This section describes how to establish a connection to a Message Queue Service and receive a message from a queue. Furthermore it shows how to connect to a JOC Cockpit instance via HTTP to send an API request:

  1. create a connection,
  2. create a session,
  3. create a destination.
  4. create a consumer,
  5. receive a message with the consumer,
  6. close the (MQ) connection,
  7. login to a JOC Cockpit instance via a HTTP REST API call,
  8. send a ./orders/add API request to a JOC Cockpit instance,
  9. close the connection.

Methods

read()

The method instantiates a MessageConsumer object to receive a message from the Message Queue Service. It extracts the value from the Message object as a string representation via the Message objects getText() method. 

read()
    public String read(String queueName) throws Exception {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            connection.start();
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (Throwable e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the session: ", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
                }
            }
         }
        return textMessage;
    }

The SOSConsumer class

The code example below shows the complete class. 

SOSConsumer
package com.sos.jms.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SOSConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
    private String uri;
    
    public SOSConsumer (String uri) {
        this.uri = uri;
    }
    
    public String read(String queueName) throws Exception {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            connection.start();
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (Throwable e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the session: ", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
                }
            }
         }
        return textMessage;
    }

}

Java Class with a main(String[] args) method (example JmsExecute.java)

The Java class makes use of the SOSProducer to create a message and send it to the message queue. It then uses the SOSConsumer class to read the message from the queue. In addition, it creates an HTTP connection to a JS7 JOC Cockpit instance to call the /orders/add API with the JSON body from the message received.

The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. Users can use their own HTTP client implementation. 

JmsExecute.java
package com.sos.jms;

import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Properties;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sos.commons.httpclient.SOSRestApiClient;
import com.sos.jms.consumer.SOSConsumer;
import com.sos.jms.producer.SOSProducer;

public class JmsExecute {

    private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616";
    private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/";
    private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\""
            + ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private static final String DEFAULT_USERNAME = "root";
    private static final String DEFAULT_PWD = "root";
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private static final String API_ADD_ORDER = "orders/add";
    private static final String API_LOGIN = "authentication/login";
    private static final String API_LOGOUT = "authentication/logout";
    private static final String ACCESS_TOKEN_HEADER = "X-Access-Token";
    private static final String APPLICATION_JSON = "application/json";
    private static final String CONTENT_TYPE = "Content-Type";
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class);
    private static String jmsServerUri = null;
    private static String jocApiUri = null;
    private static String controllerId = null;
    private static String workflowPath = null;
    private static String requestBody = null;
    private static String username = null;
    private static String pwd = null;
    private static String queueName = null;
    private static Long queueTtl = null;

    public static void main(String[] args) throws URISyntaxException {
        SOSRestApiClient client = null;
        try {
            URL classUrl = JmsExecute.class.getProtectionDomain().getCodeSource().getLocation();
            Path classPath = Paths.get(classUrl.toURI());
            String filename = classPath.getFileName().toString().replace(".jar", ".config");
            LOGGER.info(classPath.getParent().resolve(filename).toString());
            readPropertiesFile(classPath.getParent().resolve(filename));
            if ("produce".equals(args[0])) {
                SOSProducer producer = new SOSProducer(jmsServerUri);
                LOGGER.info("message send to queue:");
                LOGGER.info(requestBody);
                producer.write(requestBody, queueName, queueTtl);
            } else if ("consume".equals(args[0])) {
                SOSConsumer consumer = new SOSConsumer(jmsServerUri);
                String consumedMessage = null;
                consumedMessage = consumer.read(queueName);
                LOGGER.info("message received from queue:");
                LOGGER.info(consumedMessage);
                if (consumedMessage != null) {
                    client = setupHttpClient(username, pwd);
                    URI jocUri = URI.create(jocApiUri);
                    LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString());
                    String response = client.postRestService(jocUri.resolve(API_LOGIN), null);
                    LOGGER.info("HTTP status code: " + client.statusCode());
                    if (client.statusCode() == 200) {
                        JsonReader jsonReader = null;
                        String accessToken = null;
                        try {
                            jsonReader = Json.createReader(new StringReader(response));
                            JsonObject json = jsonReader.readObject();
                            accessToken = json.getString("accessToken", "");
                        } catch (Exception e) {
                            throw new Exception("Could not determine accessToken.", e);
                        } finally {
                            jsonReader.close();
                        }
                        client.addHeader(ACCESS_TOKEN_HEADER, accessToken);
                        client.addHeader(CONTENT_TYPE, APPLICATION_JSON);
                        LOGGER.info("REQUEST: " + API_ADD_ORDER);
                        LOGGER.info("PARAMS: " + consumedMessage);
                        String apiUrl = null;
                        if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) {
                            apiUrl = jocApiUri + API_ADD_ORDER;
                        }
                        LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString());
                        response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage);
                        LOGGER.info("HTTP status code: " + client.statusCode());
                        response = client.postRestService(jocUri.resolve(API_LOGOUT), null);
                        LOGGER.info("HTTP status code: " + client.statusCode());
                    }
                }
            }
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        } finally {
            if (client != null) {
                client.closeHttpClient();
            }
        }
    }

    private static SOSRestApiClient setupHttpClient(String username, String password) {
        SOSRestApiClient client = new SOSRestApiClient();
        String basicAuth = Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes());
        client.setBasicAuthorization(basicAuth);
        return client;
    }

    private static String cleanupValue(String value) {
        value = value.trim();
        if (value.startsWith("\"")) {
            value = value.substring(1);
        }
        if (value.endsWith("\"")) {
            value = value.substring(0, value.length() - 1);
        }
        return value;
    }

    private static void readPropertiesFile(Path path) {
        Properties props = new Properties();
        try {
            props.load(Files.newInputStream(path));
            jmsServerUri = cleanupValue(props.getProperty("jms_url"));
            LOGGER.info("cfg jms_url: " + jmsServerUri);
            queueName = cleanupValue(props.getProperty("jms_queue_name"));
            LOGGER.info("cfg jms_queue_name: " + queueName);
            queueTtl = Long.parseLong(cleanupValue(props.getProperty("jms_queue_name"))); 
            LOGGER.info("cfg jms_queue_ttl: " + queueTtl.toString());
            jocApiUri = cleanupValue(props.getProperty("joc_api_url"));
            LOGGER.info("cfg joc_api_url: " + jocApiUri);
            controllerId = cleanupValue(props.getProperty("controller_id"));
            LOGGER.info("cfg controller_id: " + controllerId);
            workflowPath = cleanupValue(props.getProperty("workflow_path"));
            LOGGER.info("cfg workflow_path: " + workflowPath);
            username = cleanupValue(props.getProperty("username"));
            pwd = cleanupValue(props.getProperty("password"));
            requestBody = "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" + workflowPath
                    + "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
        } catch (IOException e) {
            LOGGER.warn("could not read properties file, use defaults instead.");
            jmsServerUri = DEFAULT_JMS_URI;
            queueName = DEFAULT_QUEUE_NAME;
            jocApiUri = DEFAULT_JOC_API_URL;
            requestBody = DEFAULT_JOC_API_REQUEST_BODY;
            username = DEFAULT_USERNAME;
            pwd = DEFAULT_PWD;
            queueTtl = 5000L; 
        }
    }

}


Maven Configuration Example

This example shows the dependencies required to build the example above as a Maven project.

Maven Configuration
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.sos-berlin</groupId>
	<artifactId>activeMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.15.0</version>
		</dependency>
	</dependencies>
</project>