Django: clearing a redis connection after disconnecting the client from the stream

I used the Server Sent Event API in a Django application to stream live updates from my backend to the browser. The backend is pubsub Redis. My view of Django is as follows:

def event_stream(request): """ Stream worker events out to browser. """ listener = events.Listener( settings.EVENTS_PUBSUB_URL, channels=[settings.EVENTS_PUBSUB_CHANNEL], buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=request.META.get('HTTP_LAST_EVENT_ID') ) return http.HttpResponse(listener, mimetype='text/event-stream') 

And the events.Listener class, which I return as an iterator, looks like this:

 class Listener(object): def __init__(self, rcon_or_url, channels, buffer_key=None, last_event_id=None): if isinstance(rcon_or_url, redis.StrictRedis): self.rcon = rcon_or_url elif isinstance(rcon_or_url, basestring): self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) self.channels = channels self.buffer_key = buffer_key self.last_event_id = last_event_id self.pubsub = self.rcon.pubsub() self.pubsub.subscribe(channels) def __iter__(self): # If we've been initted with a buffer key, then get all the events off # that and spew them out before blocking on the pubsub. if self.buffer_key: buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) # check whether msg with last_event_id is still in buffer. If so, # trim buffered_events to have only newer messages. if self.last_event_id: # Note that we're looping through most recent messages first, # here counter = 0 for msg in buffered_events: if (json.loads(msg)['id'] == self.last_event_id): break counter += 1 buffered_events = buffered_events[:counter] for msg in reversed(list(buffered_events)): # Stream out oldest messages first yield to_sse({'data': msg}) try: for msg in self.pubsub.listen(): if msg['type'] == 'message': yield to_sse(msg) finally: logging.info('Closing pubsub') self.pubsub.close() self.rcon.connection_pool.disconnect() 

I can successfully send events to the browser using this setting. However, it seems that disconnect calls in the listener are finally never called. I assume that they are still in the camp, waiting for messages from the pub. When clients disconnect and reconnect, I see the number of connections to my Redis instance and never go down. Once it reaches 1000, Redis begins to worry and consume the entire available processor.

I would like to be able to detect when the client is no longer listening and closing the Redis connection at that time.

Things I thought of:

  • Connection pool But since redis-py README states: "You cannot pass PubSub or Pipeline objects between threads."
  • An intermediate tier for handling connections, or possibly just outages. This will not work because the middleware process_response () method is called too early (before the HTTP headers are even sent to the client). I need to call something when the client disconnects while I am in the middle of streaming content.
  • request_finished and get_request_exception signals. The first, like process_response () in middleware, seems to fire too soon. The second is not called when the client disconnects the middle thread.

Final wrinkle: In production I use Gevent so that I can leave while opening many connections. However, this connection leakage problem occurs if I use a simple old "managed server", or Gevent monkeypatched runningerver, or Gunicorn gevent staff.

+4
source share
1 answer

UPDATE: Starting with Django 1.5 , you will need to return an instance of StreamingHttpResponse if you want to lazily deploy things like me in this question / answer.

ORIGINAL ANSWER BELOW

After a lot of errors and reading the code of the framework, I found what, in my opinion, is the correct answer to this question.

  • According to WSGI PEP , if your application returns an iterator using the close () method, it should be called by the WSGI server once the response is complete. Django also supports this. This is the natural Redis link cleanup I need.
  • There is an error in the Python implementation of wsgiref and the extension in Django 'runningerver', which causes close () to be skipped if the client disconnects from the server’s intermediate stream. I sent a fix.
  • Even if the server reads close (), it will not be called until the client actually writes. If your iterator is blocked waiting on pubsub and sends nothing, close () will not be called. I worked on this by sending a no-op message to pubsub every time a client connects. That way, when the browser performs a normal reconnection, the threads that are currently skipped will try to record their closed connections, throw an exception, and then clear when the server calls close (). the SSE spec says that any line starting with a colon is a comment that should be ignored, so I just send ": \ n" as my no-op message to reset obsolete clients.

Here is the new code. First up is the Django view:

 def event_stream(request): """ Stream worker events out to browser. """ return events.SSEResponse( settings.EVENTS_PUBSUB_URL, channels=[settings.EVENTS_PUBSUB_CHANNEL], buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=request.META.get('HTTP_LAST_EVENT_ID') ) 

And the Listener class that does this work, along with a helper function for SSE formatting and a subclass of HTTPResponse that allows you to view the image a little cleaner:

 class Listener(object): def __init__(self, rcon_or_url=settings.EVENTS_PUBSUB_URL, channels=None, buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=None): if isinstance(rcon_or_url, redis.StrictRedis): self.rcon = rcon_or_url elif isinstance(rcon_or_url, basestring): self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) if channels is None: channels = [settings.EVENTS_PUBSUB_CHANNEL] self.channels = channels self.buffer_key = buffer_key self.last_event_id = last_event_id self.pubsub = self.rcon.pubsub() self.pubsub.subscribe(channels) # Send a superfluous message down the pubsub to flush out stale # connections. for channel in self.channels: # Use buffer_key=None since these pings never need to be remembered # and replayed. sender = Sender(self.rcon, channel, None) sender.publish('_flush', tags=['hidden']) def __iter__(self): # If we've been initted with a buffer key, then get all the events off # that and spew them out before blocking on the pubsub. if self.buffer_key: buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) # check whether msg with last_event_id is still in buffer. If so, # trim buffered_events to have only newer messages. if self.last_event_id: # Note that we're looping through most recent messages first, # here counter = 0 for msg in buffered_events: if (json.loads(msg)['id'] == self.last_event_id): break counter += 1 buffered_events = buffered_events[:counter] for msg in reversed(list(buffered_events)): # Stream out oldest messages first yield to_sse({'data': msg}) for msg in self.pubsub.listen(): if msg['type'] == 'message': yield to_sse(msg) def close(self): self.pubsub.close() self.rcon.connection_pool.disconnect() class SSEResponse(HttpResponse): def __init__(self, rcon_or_url, channels, buffer_key=None, last_event_id=None, *args, **kwargs): self.listener = Listener(rcon_or_url, channels, buffer_key, last_event_id) super(SSEResponse, self).__init__(self.listener, mimetype='text/event-stream', *args, **kwargs) def close(self): """ This will be called by the WSGI server at the end of the request, even if the client disconnects midstream. Unless you're using Django's runserver, in which case you should expect to see Redis connections build up until http://bugs.python.org/issue16220 is fixed. """ self.listener.close() def to_sse(msg): """ Given a Redis pubsub message that was published by a Sender (ie, has a JSON body with time, message, title, tags, and id), return a properly-formatted SSE string. """ data = json.loads(msg['data']) # According to the SSE spec, lines beginning with a colon should be # ignored. We can use that as a way to force zombie listeners to try # pushing something down the socket and clean up their redis connections # when they get an error. # See http://dev.w3.org/html5/eventsource/#event-stream-interpretation if data['message'] == '_flush': return ":\n" # Administering colonic! if 'id' in data: out = "id: " + data['id'] + '\n' else: out = '' if 'name' in data: out += 'name: ' + data['name'] + '\n' payload = json.dumps({ 'time': data['time'], 'message': data['message'], 'tags': data['tags'], 'title': data['title'], }) out += 'data: ' + payload + '\n\n' return out 
0
source

Source: https://habr.com/ru/post/1439300/


All Articles