Skip to main content

RabbitMQ java clients for beginners

Here is a sample of a consumer and producer example for RabbitMQ. The steps are
  1. Download Erlang
  2. Download Rabbit MQ Server
  3. Download Rabbit MQ Java client jars
  4. Compile and run the below two class and you are done.
This sample create a Durable Exchange, Queue and a Message. You will have to start the consumer first before you start the for the first time.

For more information on AMQP, Exchanges, Queues, read this excellent tutorial
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

+++++++++++++++++RabbitMQProducer.java+++++++++++++++++++++++++++

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;

public class RabbitMQProducer {
   public static void main(String []args) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 factory.setVirtualHost("/");
 factory.setHost("127.0.0.1");
 factory.setPort(5672);
 Connection conn = factory.newConnection();
      Channel channel = conn.createChannel();
      String exchangeName = "myExchange";
      String routingKey = "testRoute";
      byte[] messageBodyBytes = "Hello, world!".getBytes();
      channel.basicPublish(exchangeName, routingKey
,MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ;
      channel.close();
      conn.close();
      }
}

+++++++++++++++++RabbitMQConsumer.java+++++++++++++++++++++++++++

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
    public static void main(String []args) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 factory.setVirtualHost("/");
 factory.setHost("127.0.0.1");
 factory.setPort(5672);
 Connection conn = factory.newConnection();
      Channel channel = conn.createChannel();
      String exchangeName = "myExchange";
      String queueName = "myQueue";
      String routingKey = "testRoute";
      boolean durable = true;
      channel.exchangeDeclare(exchangeName, "direct", durable);
      channel.queueDeclare(queueName, durable,false,false,null);
      channel.queueBind(queueName, exchangeName, routingKey);
      boolean noAck = false;
      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(queueName, noAck, consumer);
      boolean runInfinite = true;
      while (runInfinite) {
            QueueingConsumer.Delivery delivery;
            try {
               delivery = consumer.nextDelivery();
            } catch (InterruptedException ie) {
               continue;
            }
         System.out.println("Message received" 
+ new String(delivery.getBody()));
         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
      channel.close();
      conn.close();
      }
}


See also 
  1. Purging a queuein rabbitmq
  2. Thumbnail generation using RabbitMQ
  3. Dumping Queue message contents
  4. Retrying failed messages
  5. Synchronously consume messages from a queue 

Comments

  1. Thanks for this, man...

    ReplyDelete
  2. Thanks I am glad it was useful for someone.

    ReplyDelete
  3. This code it is not valid for the last version RabbitMQ Java API 1.8 :

    ConnectionParameters no longer exists, to make it work it should be changed for:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHostName(hostName);
    factory.setPortNumber(portNumber);
    Connection conn = factory.newConnection();

    ReplyDelete
  4. can u let me know how to assign the consumer to a listener which will start listening at the application start up

    ReplyDelete
  5. Sorry I didn't understood the question properly. But let me answer from what I understood, if you want the application to consumer message as soon as its up, thats what the consumer program I have written does but it does for a simple main program launched at command line. If you want to do it for a web application then you can create a Thread from javax.servlet.ServletContextListener that would do the same. The thread in its run method can do the same loop I am doing in class RabbitMQConsumer.

    ReplyDelete
  6. Thanks man this is really beneficial for beginner .

    ReplyDelete
  7. Hi ,
    I am new to RabbitMQ trying to play with Java. I have a typical problem which shows error as below. I am not sure where it went wrong while executing the .jar file. I try to follow the program above by you but it seems the problem lies somewhere else. Can you help me please.

    Exception in thread "main" java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:310)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:176)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:163)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:384)
    at java.net.Socket.connect(Socket.java:546)
    at java.net.Socket.connect(Socket.java:495)
    at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:338)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:376)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:399)

    ReplyDelete
  8. Rudra this says connection refused which tells me that your rabbitmq is not up. Did your rabbitmq install went fine? Can you run ps -ef|grep rabbitmq or task manager and see it running?

    ReplyDelete
  9. Hi Kalpesh ,
    I tried the command and foud the results below. The daemon thread is running.Can you suggest me what can be done to set it right .


    rabbitmq 1252 1 0 09:34 ? 00:00:00 /usr/lib/erlang/erts-5.7.4/bin/epmd -daemon
    rudy 7429 7382 0 22:20 pts/0 00:00:00 grep --color=auto rabbitmq

    ReplyDelete
  10. Hey, thanks for the code, very helpful. I can run this on the same machine as the RabbitMQ server but can't connect when I run it on another machine and specify the host or ip address. Any ideas on what I need to do to enable. Failure results in either connection timed out or authentication error.

    Thanks!

    ReplyDelete
  11. Did you checked if there is a Firewall blocking it? I have never seen connection timed out connecting to rabbit server.

    ReplyDelete
  12. Can anyone please suggest whether Rabbitmq is a server (or) messaging protocol? Because i tried to start this rabbitmq using command line utility along with the batch files which is there inside the rabbitmq server folder(rabbitmq-server.bat,rabbitmq-service.bat)via console, but it doesn't works for me exactly. Thanks in advance.

    ReplyDelete
  13. RabbitMQ is a server and AMQP is the protocol it implements. I suggest you read http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ for more info.

    ReplyDelete
    Replies
    1. Thanks for your help, i went through the link which you sent to me before,please suggest me whether i need to configure this rabbitmq in eclipse sdk (or) else i need to use this via console. I'm trying to do a messaging queue kind of a thing by having one sender and receiver, but i don't know exactly how to do this, and how to run this process on rabbitmq? If anyone know about this please help me over this. Thanks in advance.

      Delete
    2. RabbitMQ is like a database server so you have to start it from command line if you are on windows(I havent used on windows), then you write programs to produce/consumer messages from it. So rabbitmq is just a broker.

      Delete
    3. Again thanks for your reply Kalpesh, after writing both the producer as well as consumer java class in eclipse IDE, how i need to see both the sending and receiving messages in rabbitmq console i.e., via localhost in browser. Can any one explain about this process exactly please? Thanks in advance.

      Delete
    4. Last I know there was no option to view the queue contents so the trick is to just write a python program that would consume the message but wont acknowledge it. You might check this out http://neopatel.blogspot.com/2010/07/dumping-rabbitmq-queue-contents.html

      Delete
  14. So your are telling that there is no way to see the messaging queue console in rabbitmq via., writing java classes rite? So can you please suggest me any other possible way except (python program) to do a messaging queue kind of thing related to java program? Because i supposed to do this in java. (Already i wrote two java classes one is producer and another one is a consumer class, it is running exactly and i can see the sending and receiving of messages alone in eclipse IDE console and i couldn't see this in rabbitmq console for this process only i need a solution from somebody). Thanks in advance.

    ReplyDelete
    Replies
    1. Yes last I know you can only run command all I get is queue sizes.You can replicate the python program I wrote in java as basically you consume the messages but dont acknowledge it.

      "kpatel@kpatel-laptop:~$ sudo rabbitmqctl list_queues
      [sudo] password for kpatel:
      Listing queues ...
      thumbnail_gen_queue 0
      localhost:8080_bill_queue 0
      "

      Delete
  15. How to read messages from RabbitMQ's queue by having a tomcat server listen at a port without using Sprin framework? Any help is greatly appreciated

    ReplyDelete
  16. How to apply the message selectors in RabbitMQ queue.We can achieve using ActiveMQ functionality

    ReplyDelete

Post a Comment

Popular posts from this blog

Haproxy and tomcat JSESSIONID

One of the biggest problems I have been trying to solve at our startup is to put our tomcat nodes in HA mode. Right now if a customer comes, he lands on to a node and remains there forever. This has two major issues: 1) We have to overprovision each node with ability to handle worse case capacity. 2) If two or three high profile customers lands on to same node then we need to move them manually. 3) We need to cut over new nodes and we already have over 100+ nodes.  Its a pain managing these nodes and I waste lot of my time in chasing node specific issues. I loath when I know I have to chase this env issue. I really hate human intervention as if it were up to me I would just automate thing and just enjoy the fruits of automation and spend quality time on major issues rather than mundane task,call me lazy but thats a good quality. So Finally now I am at a stage where I can put nodes behing HAProxy in QA env. today we were testing the HA config and first problem I immediat...

Spring 3.2 quartz 2.1 Jobs added with no trigger must be durable.

I am trying to enable HA on nodes and in that process I found that in a two test node setup a job that has a frequency of 10 sec was running into deadlock. So I tried upgrading from Quartz 1.8 to 2.1 by following the migration guide but I ran into an exception that says "Jobs added with no trigger must be durable.". After looking into spring and Quartz code I figured out that now Quartz is more strict and earlier the scheduler.addJob had a replace parameter which if passed to true would skip the durable check, in latest quartz this is fixed but spring hasnt caught up to this. So what do you do, well I jsut inherited the factory and set durability to true and use that public class DurableJobDetailFactoryBean extends JobDetailFactoryBean {     public DurableJobDetailFactoryBean() {         setDurability(true);     } } and used this instead of JobDetailFactoryBean in the spring bean definition     <bean i...

Adding Jitter to cache layer

Thundering herd is an issue common to webapp that rely on heavy caching where if lots of items expire at the same time due to a server restart or temporal event, then suddenly lots of calls will go to database at same time. This can even bring down the database in extreme cases. I wont go into much detail but the app need to do two things solve this issue. 1) Add consistent hashing to cache layer : This way when a memcache server is added/removed from the pool, entire cache is not invalidated.  We use memcahe from both python and Java layer and I still have to find a consistent caching solution that is portable across both languages. hash_ring and spymemcached both use different points for server so need to read/test more. 2) Add a jitter to cache or randomise the expiry time: We expire long term cache  records every 8 hours after that key was added and short term cache expiry is 2 hours. As our customers usually comes to work in morning and access the cloud file server it ...