In my previous post I had described a scenario to synchronously consuming message from RabbitMQ. Here is a way to do it in python.To make things simple to understand I just wrote a dummy program that dumps the content of a RabbitMQ queue and then you can use the same program to remove a message also from queue by iterating all messages and acknowledging it. (its a dumb implementation so dont judge the coding, the intent is to demonstrate synchronous consumption of queue contents).
import sys
from amqplib import client_0_8 as amqp
messageIdToRemove = None
chan = None
def process_message(msg):
print "================================================="
print "Properties ="
print msg.properties
print "Body="
print msg.body
if op == "remove_message":
if messageIdToRemove == msg.properties['message_id']:
print "@@@@@@removing message@@@@@@@@@@@@@@@@@@"
chan.basic_ack(msg.delivery_tag)
if __name__ == '__main__':
if len(sys.argv) < 9:
print "Usage python list_queue_messages.py mq_url mq_user mq_pass mq_vhost mq_exchange mq_queue_name mq_routing_key [list_queue|remove_message] messageIdToRemove"
exit()
mq_url = sys.argv[1]
mq_user = sys.argv[2]
mq_pass = sys.argv[3]
mq_vhost = sys.argv[4]
mq_exchange = sys.argv[5]
mq_queue_name = sys.argv[6]
mq_routing_key = sys.argv[7]
op=sys.argv[8]
if op == "remove_message":
messageIdToRemove=sys.argv[9]
conn = amqp.Connection(host=mq_url,
userid=mq_user,
password=mq_pass,
virtual_host=mq_vhost,
insist=False);
chan = conn.channel();
chan.queue_declare(queue=mq_queue_name, durable=True,
exclusive=False, auto_delete=False);
chan.exchange_declare(exchange=mq_exchange, type="direct", durable=True,
auto_delete=False);
chan.queue_bind(queue=mq_queue_name, exchange=mq_exchange,
routing_key=mq_routing_key)
print "Consumer dumping messages from %s" % mq_queue_name
i=0
try:
while True:
msg = chan.basic_get(mq_queue_name)
if msg is None:
break;
else:
i=i+1
process_message(msg)
finally:
chan.close();
conn.close();
print "There are %d messages in the queue" % i
Comments
Post a Comment