In my previous post I had described a scenario to synchronously consuming message from RabbitMQ. Here is a way to do it in python.To make things simple to understand I just wrote a dummy program that dumps the content of a RabbitMQ queue and then you can use the same program to remove a message also from queue by iterating all messages and acknowledging it. (its a dumb implementation so dont judge the coding, the intent is to demonstrate synchronous consumption of queue contents).
import sys from amqplib import client_0_8 as amqp messageIdToRemove = None chan = None def process_message(msg): print "=================================================" print "Properties =" print msg.properties print "Body=" print msg.body if op == "remove_message": if messageIdToRemove == msg.properties['message_id']: print "@@@@@@removing message@@@@@@@@@@@@@@@@@@" chan.basic_ack(msg.delivery_tag) if __name__ == '__main__': if len(sys.argv) < 9: print "Usage python list_queue_messages.py mq_url mq_user mq_pass mq_vhost mq_exchange mq_queue_name mq_routing_key [list_queue|remove_message] messageIdToRemove" exit() mq_url = sys.argv[1] mq_user = sys.argv[2] mq_pass = sys.argv[3] mq_vhost = sys.argv[4] mq_exchange = sys.argv[5] mq_queue_name = sys.argv[6] mq_routing_key = sys.argv[7] op=sys.argv[8] if op == "remove_message": messageIdToRemove=sys.argv[9] conn = amqp.Connection(host=mq_url, userid=mq_user, password=mq_pass, virtual_host=mq_vhost, insist=False); chan = conn.channel(); chan.queue_declare(queue=mq_queue_name, durable=True, exclusive=False, auto_delete=False); chan.exchange_declare(exchange=mq_exchange, type="direct", durable=True, auto_delete=False); chan.queue_bind(queue=mq_queue_name, exchange=mq_exchange, routing_key=mq_routing_key) print "Consumer dumping messages from %s" % mq_queue_name i=0 try: while True: msg = chan.basic_get(mq_queue_name) if msg is None: break; else: i=i+1 process_message(msg) finally: chan.close(); conn.close(); print "There are %d messages in the queue" % i
Comments
Post a Comment