Pika, stop_consuming does not work

I am new to rabbitmq and pika, and I have problems with stopping consumption.

channel and queue settings:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True) 

In principle, the consumer and the manufacturer:

consumer:

 def task(task_id): def callback(channel, method, properties, body): if body != "quit": print(body) else: print(body) channel.stop_consuming(task_id) channel.basic_consume(callback, queue=task_id, no_ack=True) channel.start_consuming() print("finish") return "finish" 

manufacturer:

 proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE) while proc.returncode is None: # running line = proc.stdout.readline() if line: channel.basic_publish( exchange='', routing_key=self.request.id, body=line ) else: channel.basic_publish( exchange='', routing_key=self.request.id, body="quit" ) break 

The task consumer gave me the output:

 # ... output from sample.sh, as expected quit  }q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU 1419350416qUchildrenq]u. 

However, "finish" did not print, so I assume that channel.stop_consuming(task_id) does not stop consuming. If so, what is the correct way? Thanks.

+6
source share
1 answer

I had the same problem. This seems to be due to the fact that inside start_consuming calls self.connection.process_data_events(time_limit=None) . This time_limit=None makes it hang.

I managed to solve this problem by replacing the call with channel.start_consuming() its execution, hacking:

 while channel._consumer_infos: channel.connection.process_data_events(time_limit=1) # 1 second 
+5
source

Source: https://habr.com/ru/post/980089/


All Articles