The magic of Interfaces: use JMS to create Queue/Topic producers and consumers

Remove particular implementations and use interfaces (for JMS or other features) improve the independency and flexibility of your application: with a few changes you can change providers, APIs or products without modify your business/main logic

A few week ago I had a requirement to create a message producer for a TIBCO-EMS Topic. It was a very simple task.
The problem started when I needed to test if the sent messages were on the Topic… and I didn’t have access to the Topic’s Admin console.
So I started to research how to create a message producer but also a message consumer. I founded a lot of examples on Internet but most of them depended of the queue/topic implementation and they didn’t use JMS interfaces.
Finally, I created a producer/consumer JAVA program for the TIBCO-EMS implementation. My co-workers found it very useful to test the sent and received messages and asked to me if I could “configure” for IBM-MQ queues.
The original program became in a Spring Boot Application that removes the particular implementations and uses JMS interfaces for several operations. The next step was transform the local project in a Web Application with a very simple (and ugly) interface to test the send, browse/consume and remove operations on different configured topics and queues: the JmsToolkitWeb.
If you found a better solution for one or more feature or you want to share your best practices with us, please, let me know and I will update this post 🙂

Some tips

  • The application uses JAVA JMS interfaces so you don’t depend of the implementation of the queue/topic provider
  • You only need to create a JMS ConnectionFactory Bean using the Connection Factory provided by you selected queue/topic driver
  • You don’t need access to Administrator console to perform operation with the messages
  • Includes Restful End-points for different queue/topic operations
  • Creates initial topic subscribers for the configured topics when the Application starts (you don’t need to create through the Administrator console)
  • Developed with Spring Boot Application 1.5.3.RELEASE version (and also tested with 1.4.2.RELEASE version)
  • Configured by default to use Apache Active-MQ
  • Tested with TIBCO-EMS, IBM-MQ and Apache ActiveMQ providers

Configuration

  • TIBCO-EMS
    • add the Maven dependency to pom.xml
    • create a Bean for the JMS ConnectionFactoring using new com.tibco.tibjms.TibjmsConnectionFactory(url)
    • set the specific values using a JmsTemplate and set the ConnectionFactory to this JmsTemplate
    • instantiate the JmsTemplate to get the ConnectionFactory in your services (for example, use @Autowired annotation)
  • IBM-MQ
    • add the Maven dependency to pom.xml
    • create a Bean for the JMS ConnectionFactoring using new com.ibm.mq.jms.MQConnectionFactory();
    • set the specific values for the MQ Connection Factory
    • instantiate the ConnectionFactory in your services (for example, use @Autowired annotation)
  • Apache Active-MQ
    • add the Maven dependency to pom.xml
    • create a Bean for the JMS ConnectionFactoring using new ActiveMQConnectionFactory();
    • set the specific values for the MQ Connection Factory
    • instantiate the ConnectionFactory in your services (for example, use @Autowired annotation)

Example for ActiveMQ

Add the Maven dependencies

Add the Spring Boot starter test dependency to your pom.xml

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

Create a JMS ConnectionFactory Bean

Here you create your Queue/Topic connection using your provider method(s). In the other classes you only use JMS interfaces so if in the future you need to change your provider you only will modify this bean 🙂

package com.example.jms.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

/**
 * Configuration class for JMS beans.
 * @author Gabriel
 *
 */
@Configuration
@ConfigurationProperties("com.exmaple.jms.config.activemq")
@Profile("default")
public class JmsLocalConfig {

	private String url;
	private String user;
	private String password;
	
	/**
	 * Creates a {@link ConnectionFactory} for your configured JMS provider.
	 * @return
	 */
	@Bean
	public ConnectionFactory createConnectionFactoryBean() {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		connectionFactory.setBrokerURL(url);
		connectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER); // or connectionFactory.setUserName(user);
		connectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD); // or connectionFactory.setPassword(password);
		return connectionFactory;
	}
	
	// methods for getters and setters
}	

Create a JMS Queue producer

package com.example.jms.service.queue;

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Services for JMS queues.
 * @author Gabriel
 *
 */
@Service
public class QueueService {

	private static final Logger LOGGER = LoggerFactory.getLogger(QueueService.class);
	
	private static final int RECEIVE_TIME_OUT_MILISECONDS = 5000;
	
	@Autowired
	ConnectionFactory connectionFactory;

	/**
	 * Sends the given message to the given queue code.
	 * @param queueDestinationName
	 * @param message
	 * @return
	 */
	public boolean sendMessageTo(final String queueDestinationName, final String message) {
		LOGGER.info("sendMessageTo - start");
		
		Connection connection = null;
		Session session = null;

		try {
			connection = connectionFactory.createConnection();
			connection.setClientID("SendQueueSample");

			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(queueDestinationName);

			session.createProducer(destination).send(session.createTextMessage(message));
			return true;
		} catch (Exception e) {
			LOGGER.error("ERROR : ", e);
			return false;
		} finally {
			closeSession(session);
			closeConnection(connection);
			LOGGER.info("sendMessageTo - end");
		}
	}
}

Create a JMS Topic producer

package com.example.jms.service.topic;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.example.jms.enums.ResourcesEnum;

/**
 * Services for topics.
 * @author Gabriel
 *
 */
@Service
public class TopicService {

	private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class);
	
	private static final String SUBSCRIBER_NAME = "theSubscriber";
	private static final int RECEIVE_TIME_OUT_MILISECONDS = 5000;
	
	private Map<String,String> subscribers = new HashMap<>();
	
	@Autowired
	ConnectionFactory connectionFactory;
	
	/**
	 * Sends the given message to the given topic resource. 
	 * @param topicDestinationName
	 * @param message
	 * @return
	 */
	public boolean sendMessageTo(final String topicDestinationName, final String message) {
        LOGGER.info("sendMessageTo - start");
        
        Connection connection = null;
        Session session = null;
    
        try {
            connection = connectionFactory.createConnection();
            connection.setClientID(topicDestinationName);
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createTopic(topicDestinationName);            
            
            session.createProducer(destination).send(session.createTextMessage(message));
            
            LOGGER.debug("sendMessageTo - message sent ok to : {}", topicDestinationName);
            LOGGER.debug("sendMessageTo - message sent as    : {}", SUBSCRIBER_NAME);
            LOGGER.debug("sendMessageTo - message body       : {}", message);
            
            return true;
        } catch (Exception e) {
        	LOGGER.error("ERROR", e);
            return false;
        } finally {
            closeSesion(session);
            closeConnection(connection);                        
            LOGGER.info("sendMessageTo - end");
        }
	}
}

Create a JMS Queue consumer

	/**
	 * Removes/Consumes the messages from a given queue resource.
     * Only for test purpose: this feature will be executed through the queue's administrator console.
	 * @param queueDestinationName
	 * @return
	 */
	public boolean removeMessagesFrom(final String queueDestinationName) {
		LOGGER.info("removeMessagesFrom - start");
		
		Connection connection = null;
		Session session = null;
		MessageConsumer messageConsumer = null; 
				
		try {
			connection = connectionFactory.createConnection();
			connection.setClientID("ConsumeQueueSample");
			connection.start();
			
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(queueDestinationName);

			messageConsumer = session.createConsumer(destination);
			Message receivedMessage = messageConsumer.receive(RECEIVE_TIME_OUT_MILISECONDS);

			while (receivedMessage != null) {
				receivedMessage = messageConsumer.receive(RECEIVE_TIME_OUT_MILISECONDS);
			}

			return true;
		} catch (Exception e) {
			LOGGER.error("EROR : ", e);
			return false;
		} finally {
			closeMessageConsumer(messageConsumer);
			closeSession(session);
			closeConnection(connection);
			LOGGER.info("removeMessagesFrom - end");
		}
	}

Create a JMS Topic subscriber

    /**
     * Removes/Consumes the messages from a given topic resource.
     * Only for test purpose: this feature will be executed through the topic's administrator console.
     * @param topicDestinationName
     * @return
     */
	public boolean removeMessagesBySubscriberFrom(final String topicDestinationName) {
		LOGGER.info("removeMessagesBySubscriberFrom - start");
		
        Connection connection = null;
        Session session = null;
        
        try {
            connection = connectionFactory.createConnection();
            connection.setClientID(topicDestinationName);
            connection.start();
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic(topicDestinationName);            

            for (String subscriber : subscribers.keySet()) {
            	if (subscribers.get(subscriber).equals(topicDestinationName)) {
                	LOGGER.debug("removeMessagesBySubscriberFrom - topic : {} - subscriber : {}", topicDestinationName, subscriber);
                	
		        	MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subscriber);
		        	Message receivedMessage = messageConsumer.receive(RECEIVE_TIME_OUT_MILISECONDS);
		            
		            while(receivedMessage != null) {
		            	receivedMessage = messageConsumer.receive(RECEIVE_TIME_OUT_MILISECONDS);
		            }
		            
		            closeMessageConsumer(messageConsumer);
            	}
            }
            return true;
        } catch (Exception e) {
        	LOGGER.error("ERROR", e);
            return false;
        } finally {
            closeSesion(session);
            closeConnection(connection);  
            LOGGER.info("removeMessagesBySubscriberFrom - end");
        }
	}

Create a JMS Queue browser

	/**
	 * Browses and gets the messages from the given queue resource.
	 * @param queueDestinationName
	 * @return
	 */
	public Map<String, String> browseMessagesFrom(final String queueDestinationName) {
		LOGGER.info("browseMessagesFrom - start");
		
		Map<String, String> queueMessages = new HashMap<>();
		Connection connection = null;
		Session session = null;
		
		try {
			connection = connectionFactory.createConnection();
			connection.setClientID("BrowseQueueSample");
			connection.start();
			
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue destination = session.createQueue(queueDestinationName);

			int elements = 1;

			@SuppressWarnings("unchecked")
			Enumeration<Message> messages = session.createBrowser(destination).getEnumeration();

			while (messages != null && messages.hasMoreElements()) {
				Message message = messages.nextElement();
				String textMessage = ((TextMessage) message).getText();
				queueMessages.put(String.valueOf(elements++), textMessage);
				
				LOGGER.debug("browseMessagesFrom - message : {}", textMessage);
			}
			
			return queueMessages;
		} catch (Exception e) {
			LOGGER.error("ERROR : ", e);
			return queueMessages;
		} finally {
			closeSession(session);
			closeConnection(connection);
			LOGGER.info("browseMessagesFrom - end");
		}
	}

Final notes
Remove particular implementations and use interfaces (for JMS or other features) improve the independency and flexibility of your application: with a few changes you can modify providers, APIs or products without change your busines/main logic.

You can get the complete code from my public GitHub repository:
https://github.com/Gabotto/JmsToolkitWebApplication

Let me know if you have any problem, comment or new ideas:
WordPress: https://gabelopment.wordpress.com/
Email: gabelopment@gmail.com

Also you can find me at Upwork

See you soon with more development notes…

Author: Gabi

Senior Backend Software Engineer with more than 20 years of experience working as a Developer, Architect / Software Engineer, Technical Lead and Team Manager for industries such as TELCO, Media, Insurance, Finance and AI. I'm a "fan" of the Agile methodology and development patterns & principles like KISS, YAGNI or DRY. I really enjoy being an SME in my projects, studying and learning new technologies, researching how to solve problems in an innovative, optimal and simple way, following and sharing good practices, clean code and software design principles & patterns with my teams and co-workers. I love to build effective teams in a collaborative work environment!

Leave a comment