Python client for PubSub returns StatusCode.NAVAILABLE

I am trying to install a long Pull subscription on the Google Cloud PubSub Google theme. I use code very similar to the example provided in the documentation here , i.e.:

def receive_messages(project, subscription_name): """Receives messages from a pull subscription.""" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) def callback(message): print('Received message: {}'.format(message)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. print('Listening for messages on {}'.format(subscription_path)) while True: time.sleep(60) 

The problem is that I sometimes get the following trace:

 Exception in thread Consumer helper: consume bidirectional stream: Traceback (most recent call last): File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner self.run() File "/usr/lib/python3.5/threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume self._policy.on_exception(exc) File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception raise exception File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume for response in response_generator: File "/path/to/grpc/_channel.py", line 348, in __next__ return self._next() File "/path/to/grpc/_channel.py", line 342, in _next raise self grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])> 

I saw that another question referred to this question, but here I ask how to properly handle it in Python. I tried to wrap the request in an exception, but it seems to be running in the background, and I cannot retry in the event of this error.

+5
source share
1 answer

A somewhat hacky approach that works for me is policy_class . By default, the on_exception function is on_exception , which ignores DEADLINE_EXCEEDED . You can create a class that inherits the default value and also ignores UNAVAILABLE . My looks like this:

 from google.cloud import pubsub from google.cloud.pubsub_v1.subscriber.policy import thread import grpc class AvailablePolicy(thread.Policy): def on_exception(self, exception): """The parent ignores DEADLINE_EXCEEDED. Let also ignore UNAVAILABLE. I'm not sure what triggers that error, but if you ignore it, your subscriber seems to work just fine. It probably an intermittent thing and it reconnects later if you just give it a chance. """ # If this is UNAVAILABLE, then we want to retry. # That entails just returning None. unavailable = grpc.StatusCode.UNAVAILABLE if getattr(exception, 'code', lambda: None)() == unavailable: return # For anything else, fallback on super. super(AvailablePolicy, self).on_exception(exception) subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy) # Continue to set up as normal. 

It is very similar to the original on_exception just ignores another error. If you want, you can add some logging whenever an exception is thrown and make sure that it still works. Future messages will pass anyway.

+5
source

Source: https://habr.com/ru/post/1272692/


All Articles