Call the twisted protocol method from another thread

I made a home security program in Python that uses GPIO for raspberries to sense movement and activate a siren. Users activate / deactivate the system using the NFC tag for the nfc-reader, also connected to the raspberry pi.

To do this, I need to constantly check the nfc tags in a non-blocking way and at the same time constantly check the sensors for movement and not block. I need a few more parallel actions, but I think that these two are enough to express my point of view.

Right now I am using threads that I start / stop like this: Stopping a stream after a certain time - I'm not sure if this is the best way, but at the moment the system is working fine.

Now I want to expand my functionality to offer notifications via websockets. I found that this can be done using Twisted, but I'm confused.

Here is a sample code of how I am trying to do this:

from twisted.internet import reactor from autobahn.websocket import WebSocketServerFactory, \ WebSocketServerProtocol, \ listenWS def thread1(stop_event): while(not stop_event.is_set()): stop_event.wait(4) print "checking sensor" # sensor_state = GPIO.input(11) if sensor_state == 1: # how can I call send_m("sensor detected movement") #<--- t1_stop_event.set() t1_stop_event = Event() t1 = Thread(target=thread1, args=(t1_stop_event,)) class EchoServerProtocol(WebSocketServerProtocol): def onMessage(self, msg, binary): print "received: "+msg print "stopping thread1" t1_stop_event.set() def send_m(self, msg): self.sendMessage(msg) if __name__ == '__main__': t1.start() factory = WebSocketServerFactory("ws://localhost:9000") factory.protocol = EchoServerProtocol listenWS(factory) reactor.run() 

So, how can I call a method to send a server protocol from a thread, such as thread1?

+4
source share
2 answers

As it often happens, the answer to your question about threads and Twisted is "don't use threads."

The reason you start the stream is because you can check the GPIO sensor multiple times. Does the sensor unit check? I guess not, because if it's a GPIO, it's locally available hardware and its results will be available right away. But I will give you the answer in both directions.

The main thing that you use for this topic is to do something repeatedly . If you want to repeat something in Twisted, there is never a reason to use streams :). Twisted includes an excellent API for repetitive tasks: LoopingCall . Your example rewritten to use LoopingCall (again, assuming the GPIO call is not blocking) would look like this:

 from somewhere import GPIO from twisted.internet import reactor, task from autobahn.websocket import WebSocketServerFactory, \ WebSocketServerProtocol, \ listenWS class EchoServerProtocol(WebSocketServerProtocol): def check_movement(self): print "checking sensor" sensor_state = GPIO.input(11) if sensor_state == 1: self.send_m("sensor detected movement") def connectionMade(self): WebSocketServerProtocol.connectionMade(self) self.movement_checker = task.LoopingCall(self.check_movement) self.movement_checker.start(4) def onMessage(self, msg, binary): self.movement_checker.stop() def send_m(self, msg): self.sendMessage(msg) if __name__ == '__main__': factory = WebSocketServerFactory("ws://localhost:9000") factory.protocol = EchoServerProtocol listenWS(factory) reactor.run() 

Of course, there is one case where you still need to use threads: if you need to perform a thread to check the GPIO (or any other your repetitive task), because it is a potentially blocking operation in the library that cannot be modified to make better use of Twisted, and you don’t want to block the main loop.

In this case, you still want to use LoopingCall and take advantage of one of its features: if you return Deferred from the function called by LoopingCall , it will not call it again until Deferred lights up. This means that you can transfer the task to the stream and not worry about the main loop filling in requests for this stream: you can simply resume the cycle in the main stream automatically when the stream ends.

To give you a more concrete idea of ​​what I mean, here is the check_movement function, modified to work with a long-term blocking call that is executed in a thread, instead of a quick poll call that can be run on the main loop:

 def check_movement(self): from twisted.internet.threads import deferToThread def get_input(): # this is run in a thread return GPIO.input(11) def check_input(sensor_state): # this is back on the main thread, and can safely call send_m if sensor_state == 1: self.send_m("sensor movement detected") return deferToThread(get_input).addCallback(check_input) 

Everything else in the above example remains the same.

+5
source

There are several factors in your example. Short answer: Check out this threading documentation in Twisted .

  • Until you need to use the Twisted reactor to use the protocol classes (the threads and protocol implementation are decoupled), you called reactor.run , so I consider all of the following to be applicable to you.
  • Let Twisted create threads for you. Exiting the frame may cause you problems. There are no “public” IPC messaging APIs with the reactor (I think), so if you use Twisted, you pretty much have to go all the way.
  • By default, Twisted does not switch threads to call your callbacks. To delegate a workflow from the main reactor stream (i.e., perform I / O blocking), you do not need to create the stream yourself, you use reactor.callInThread , and it will work in the workflow. If you do not, everything will work in the main stream of the reactor, which means, for example, any I / O operations block the stream of reactors, and you will not be able to receive any events until the I / O is completed.
  • Encoding in worker threads should use reactor.callFromThread to do something that is not thread safe. Provide a callback that will work in the core of the reactor. You are better safe than sorry, trust me.
  • All of the above applies also to Deferred . So don't be afraid to use partial(reactor.callFromThread, mycallback) or partial(reactor.callInThread, mycallback) instead of just mycallback when setting up callbacks. I learned that this is a difficult path; without it, I found that any blocking I / O that I could do in deferred callbacks was either an error (due to thread problems) or blocking the main thread.

If you are just starting out at Twisted, this is a bit of a “fall in confidence”. Learn to let go of your own threads and pass messages through Queue objects, etc. Once you figure out how Deferred and the operation of the reactor (this is called "Twisted" for some reason!), It will seem completely natural to you. Twisted makes you separate and share problems in the style of functional programming, but as soon as you finish, I find that it is very clean and works well.

One tip: I wrote several decorators to use in all my callback functions, so that I would not have to constantly call callInThread and callFromThread and configure Deferred to handle error callbacks throughout the code; my decorators allow this behavior for me. This probably prevented the mistakes from forgetting about it, and it certainly made me more attractive to me.

+2
source

Source: https://habr.com/ru/post/1492410/


All Articles