RabbitMQ has developed an excellent Java AMQP library. The full API documentation for the library can be found here. There are different ways to use the spring framework with RabbitMQ:
Begin to add the AMQP library as an dependency in your pom.xml file (use the latest version):
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
<scope>compile</scope>
</dependency>
Fetch the CLOUDAMQP_URL environment variable and subtitute with a url to a local RabbitMQ instance if you couldn't be found.
Create a ConnectionFactory and configure it with the URL. All connections are created from this factory and on the connection we create a Channel.
Declare a queue and publish a message to the "default exchange" with the queue name as routing key. This is a shortcut as all queues by default a bound to the "default queue" with the it's name as routing parameter.
Then start a consumer which listens to that queue prints out the message body to the console.
The full code can be seen at github.com/cloudamqp/java-amqp-example.
public static void main(String[] args) throws Exception {
String uri = System.getenv("CLOUDAMQP_URL");
if (uri == null) uri = "amqp://guest:guest@localhost";
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
//Recommended settings
factory.setConnectionTimeout(30000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue = "hello"; //queue name
boolean durable = false; //durable - RabbitMQ will never lose the queue if a crash occurs
boolean exclusive = false; //exclusive - if queue only will be used by one connection
boolean autoDelete = false; //autodelete - queue is deleted when last consumer unsubscribes
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
String message = "Hello CloudAMQP!";
String exchangeName = "";
String routingKey = "hello";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}
If you are using the Spring Framework for Java you can use the spring-amqp library for RabbitMQ. Good documentation about the library can be found in the Spring documentation for AMQP. Begin by adding follwing dependencie to your POM.xml-file dependencies section (use the latest version):
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
</dependencies>
Fetch the CLOUDAMQP_URL environment variable from the console page for your instance.
Create a CachingConnectionFactory and configure it with username, password and vhost. All connections are created from this factory and on the connection we create a Channel.
public static void main(String[] args) {
// set up the connection
CachingConnectionFactory connectionFactory=new CachingConnectionFactory("bunny.cloudamqp.com");
connectionFactory.setUsername("rozcdysg");
connectionFactory.setPassword("Mx9GntDW4WBJvmY2_M_Qr2_a4gRGc3_G");
connectionFactory.setVirtualHost("rozcdysg");
//Recommended settings
connectionFactory.setRequestedHeartBeat(30);
connectionFactory.setConnectionTimeout(30000);
//Set up queue, exchanges and bindings
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
Queue queue = new Queue("myQueue");
admin.declareQueue(queue);
TopicExchange exchange = new TopicExchange("myEExchange");
admin.declareExchange(exchange);
admin.declareBinding(
BindingBuilder.bind(queue).to(exchange).with("foo.*"));
//Set up the listener
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
Object listener = new Object() {
public void handleMessage(String foo) {
System.out.println(foo);
}
};
//Send a message
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
container.setMessageListener(adapter);
container.setQueueNames("myQueue");
container.start();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myExchange", "foo.bar", "Hello CloudAMQP!");
try{
Thread.sleep(1000);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
container.stop();
}
If you are using the Spring Framework for Java you can use the spring-amqp library for RabbitMQ. Good documentation about the library can be found in the Spring documentation for AMQP. Begin by adding follwing dependencie to your POM.xml-file dependencies section (use the latest version):
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
</dependencies>
Fetch the CLOUDAMQP_URL environment variable from the console page for your instance.
Start to create the XML bean definitions for sender-context.xml and listener-context.xml
<!-- /cloudamqp/src/main/resources/listener-context.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory"
host="host"
virtual-host="vhost"
username="username"
password="password" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create queue -->
<rabbit:queue id="mySpringQueue" auto-delete="false" name="mySpringQueue" />
<!-- create myExchange and bind mySpringQueue to myExchange-->
<rabbit:topic-exchange id="myExchange" name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="mySpringQueue" pattern="my.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- instantiate listener -->
<bean id="myListener" class="com.cloudamqp.amqp.Listener" />
<rabbit:listener-container id="myListenerContainer" connection-factory="connectionFactory">
<rabbit:listener ref="myListener" queues="mySpringQueue" /></rabbit:listener-container>
</beans>
<!-- /cloudamqp/src/main/resources/listener-context.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- create a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory"
host="host"
virtual-host="vhost"
username="username"
password="password" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- create a bean that sends messages to myExhange-->
<rabbit:template id="cloudamqpTemplate" connection-factory="connectionFactory" exchange="myExchange"/>
</beans>
Create a Listener class that implements MessageListener. This class will listen for messages from myQueue, as specified in the listener-contet.xml file.
package com.cloudamqp.amqp;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class Listener implements MessageListener {
public void onMessage(Message message) {
String messageBody= new String(message.getBody());
System.out.println("Message received: "+messageBody);
}
}
Create a ListenerContainer class that reads resources from the context file.
package com.cloudamqp.amqp;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ListenerContainer {
public static void main(String[] args) {
ApplicationContext c1 = new ClassPathXmlApplicationContext("listener-context.xml");
}
}
Create a sender class tha sends messages to the exchange my.routingkey.
package com.cloudamqp.amqp;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Sender {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("sender-context.xml");
AmqpTemplate aTemplate = (AmqpTemplate) context.getBean("cloudamqpTemplate");
for (int i = 1; i < 6; i++)
{
aTemplate.convertAndSend("my.routingkey", "Hello CloudAMQP, Message # " +i);
}
}
}
Run the ListenerContainer as a Java application and the Sender as a Java application. The listener will display the messages send by the sender.