ActiveMQ embebido en Java

De ChuWiki

Para trabajar con ActiveMQ no es necesaria bajárselo, instalarlo y arrancarlo. Si en un mismo programa Java lo usamos de una forma concreta, en nuestra misma máquina virtual se crearán las colas de mensajes propios de ActiveMQ y podremos usarlo para enviarnos mensajes entre las distintas clases de nuestro programa. Veamos un ejemplo sencillo.


Dependencias[editar]

Para nuestro ejemplo necesitamos los siguientes jar en nuestro proyecto

activemq-broker-5.11.1.jar
activemq-client-5.11.1.jar
activemq-openwire-legacy-5.11.1.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
log4j-1.2.17.jar
slf4j-api-1.7.10.jar
slf4j-log4j12-1.7.12.jar

que puedes obtener bajándote el zip de ActiveMQ y buscándolas en el subdirectorio lib, o bien si usas Maven, poniendo este par de dependencias en tu fichero pom.xml

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-client</artifactId>
	<version>5.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-broker</artifactId>
	<version>5.11.1</version>
</dependency>

Crear el broker[editar]

Un Broker en ActiveMQ es un servidor de ActiveMQ ejecutándose al que nos podemos conectar para enviar y recibir mensajes. Este Broker se puede arrancar como un ejecutable independiente si nos bajamos ActiveMQ y ejecutamos el script adecuado que está dentro del directorio bin de la instalación. Sin embargo, es posible arrancarlo también desde nuestro programa Java de forma explícita, instanciando la clase BrokerService. Pero aún tenemos otra opción mejor, no necesitamos ni siquiera crearlo explícitamente. Nos basta con establecer una conexión contra ActiveMQ usando el protocolo o componente "vm". Este creará automáticamente el Broker si no existe ya.

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

Connection connection = connectionFactory.createConnection();
connection.start();

Analicemos la cadena de conexión que hemos puesto:

  • vm El protocolo a usar, esto le dirá a ActiveMQ que no vamos a salir de nuestra propia máquina virtual.
  • localhost es el nombre para el Broker. Se creará un Broker de nombre "localhost" si no existe ya. Podemos dar cualquier nombre que nos guste.
  • broker.persistent=false Por defecto el broker guardará todos los mensajes que enviemos en una base de datos en fichero, de forma que si esos mensajes no se entregan mientras se ejecuta nuestro programa, la próxima vez que lo arranquemos los seguiremos teniendo disponibles. Si lo queremos así, debemos añadir otra dependencia con activemq-kahadb-store-5.11.1.jar. Si no queremos persistencia, en la cadena de conexión debemos poner esta propiedad, evitando así también la dependencia de activemq-kahadb-store-5.11.1.jar.

Listo, ya tenemos, si no lo teníamos ya, un broker de ActiveMQ disponible dentro de nuestra máquina virtual para enviarnos mensajes a nosotros mismos.


Envío de mensajes[editar]

El resto ya es trabajo normal con ActiveMQ. Pondremos un ejemplo sencillo de envío y recepción de mensaje.

Para envío de mensaje, creamos una conexión, creamos una sesión, un destino, un productor de mensaje, un mensaje de texto y lo enviamos.

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("UNA.COLA");

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

TextMessage message = session.createTextMessage("Hello !!");
producer.send(message);

producer.close();
session.close();
connection.close();

Veamos algunos detalles.

  • Creamos la conexión con la connectionFactory que creamos anteriormente.
  • Arrancamos la conexión con connection.start()
  • Creamos una sesión. Los parámetros son:
    • boolean indicando si queremos que la sesión permita transacciones. Si permite transacciones, podemos enviar/recibir mensajes que no se enviarán o retirarán de la cola definitivamente hasta que no llamemos a session.commit(). Si algo va mal en el tratamiento de mensajes, podemos llamar a session.rollback() para reestablecer la cola de mensajes. Para nuestro ejemplo sencillo, no queremos transacciones.
    • Session.AUTO_ACKNOWLEDGE. A ActiveMQ hay que indicarle que hemos recibido el mensaje, para que se pueda olvidar de él. Con esta opción, no necesitamos decirle explícitamente que lo hemos recibido, se hará automáticamente en el momento que nos lo entregue.
  • Creamos la cola donde vamos a meter los mensajes. La cola se identifica con un nombre (un String) y puede ser el que queramos. Únicamente, el que envía el mensaje y el que lo recibe deben estar de acuerdo qué cola (qué nombre) van a usar. Esta llamada no crea realmente la cola en ActiveMQ, sólo es un nombre para identificarla más adelante.
  • Creamos el productor de mensajes con session.createProducer(), indicando que el destino será la cola que acabamos de crear.
  • Indicamos que no queremos que los mensajes sean persistentes, es decir, que no queremos que se guarden en ninguna base de datos. Cuando creamos el broker ya indicamos que no queríamos persistencia, así que esta llamada realmente sobra.
  • Para el envío de mensaje, vamos a crear un mensaje de texto. Session tiene varios métodos para crear mensajes de distintos tipos, pero no nos vamos a complicar en este ejemplo.
  • Finalmente, sólo nos queda enviar el mensaje.
  • Cuando terminemos, vamos cerrando cosas, en concreto, el productor, la sesión y la conexión. Evidentemente, siempre que no se estén usando en otras partes del código y no nos vayan a hacer falta más.


Recepción de mensajes[editar]

La recepción de mensaje puede ser de la siguiente forma

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("UNA.COLA");

MessageConsumer consumer = session.createConsumer(destination);

Message message = consumer.receive(1000);
if ((null!=message) && (message instanceof TextMessage)) {
   System.out.println("Received : " + ((TextMessage) message).getText());
}

consumer.close();
session.close();
connection.close();

Nos saltamos la explicación de las primeras líneas, puesto que son iguales que en el caso del productor de mensajes. Vamos directamente a la línea donde se crea el consumidor de mensajes.

  • Creamos el consumidor de mensajes con session.createConsumer(), indicando como destino la cola de la que queremos leer los mensajes.
  • La llamada a consumer.receive() se queda bloqueada hasta que nos llegue un mensaje. Se puede pasar como parámetro el tiempo máximo en milisegundos que queremos que esta llamada se quede bloqueada en espera. Al terminar, o bien tenemos un mensaje null si ha pasado el tiempo de espera sin recibir mensaje, o bien tenemos el mensaje que nos ha llegado. Alternativamente, podríamos llamar a consumer.setMessageListener() para pasarle un listener al que ActivmeMQ avisará cuando llegue un mensaje. De esta forma, no necesitamos llamar a receive().
  • Verificamos si el mensaje ha llegado (no es null) y si es del tipo que esperamos (TextMessage)
  • Sacamos por pantalla el texto recibido.

Sólo nos queda cerrar todo si ya no lo necesitamos más.


Enlaces[editar]

  • El [1] código completo de este ejemplo. En el código se crea un productor y 10 consumidores. El productor envía 100 mensajes, que ActiveMQ va repartiendo entre los consumidores.
  • Más tutoriales de ActiveMQ