Applications with an intensive serial input-output port: transfer from streaming, building on the basis of a queue for asynchronous (ala Twisted)

So, I was working on an application for a client that communicates with wireless devices through Serial (RS-232) "Master". Currently, I have written the main part of the application using streams (below). I noticed on #python that the consensus does NOT seem to use threads and uses the avoid capabilities of asynchronous communication.

I could not find good examples of using transcoded for asynchronous input / output for serial port. However, I found the Dave Peticolas 'Twisted Introduction' (thanks to nosklo) I'm working on right now, but uses sockets instead of serial communication (but the concept is asynchronously definitely very well explained).

How do I port this app to Twisted from Threading, Queues? Are there any advantages / disadvantages (I noticed that sometimes, if a thread freezes, it will be a BSOD system)?

Code (msg_poller.py)

from livedatafeed import LiveDataFeed from msg_build import build_message_to_send from utils import get_item_from_queue from protocol_wrapper import ProtocolWrapper, ProtocolStatus from crc16 import * import time import Queue import threading import serial import gc gc.enable() PROTOCOL_HEADER = '\x01' PROTOCOL_FOOTER = '\x0D\x0A' PROTOCOL_DLE = '\x90' INITIAL_MODBUS = 0xFFFF class Poller: """ Connects to the serial port and polls nodes for data. Reads response from node(s) and loads that data into queue. Parses qdata and writes that data to database. """ def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False): try: self.serial = serial.serial_for_url(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=.01) except AttributeError: self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=.01) self.com_data_q = None self.com_error_q = None self.livefeed = LiveDataFeed() self.timer = time.time() self.dtr_state = True self.rts_state = True self.break_state = False def start(self): self.data_q = Queue.Queue() self.error_q = Queue.Queue() com_error = get_item_from_queue(self.error_q) if com_error is not None: print 'Error %s' % (com_error) self.timer = time.time() self.alive = True # start monitor thread # self.mon_thread = threading.Thread(target=self.reader) self.mon_thread.setDaemon(1) self.mon_thread.start() # start sending thread # self.trans_thread = threading.Thread(target=self.writer) self.trans_thread.setDaemon(1) self.trans_thread.start() def stop(self): try: self.alive = False self.serial.close() except (KeyboardInterrupt, SystemExit): self.alive = False def reader(self): """ Reads data from the serial port using self.mon_thread. Displays that data on the screen. """ from rmsg_format import message_crc, message_format while self.alive: try: while self.serial.inWaiting() != 0: # Read node data from the serial port. Data should be 96B. data = self.serial.read(96) data += self.serial.read(self.serial.inWaiting()) if len(data) > 0: # Put data in to the data_q object self.data_q.put(data) if len(data) == 96: msg = self.data_q.get() pw = ProtocolWrapper( header=PROTOCOL_HEADER, footer=PROTOCOL_FOOTER, dle=PROTOCOL_DLE) status = map(pw.input, msg) if status[-1] == ProtocolStatus.IN_MSG: # Feed all the bytes of 'msg' sequentially into pw.input # Parse the received CRC into a 16-bit integer rec_crc = message_crc.parse(msg[-4:]).crc # Compute the CRC on the message calc_crc = calcString(msg[:-4], INITIAL_MODBUS) from datetime import datetime ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S') if rec_crc != calc_crc: print ts print 'ERROR: CRC Mismatch' print msg.encode('hex') else: #msg = message_format.parse(msg[1:]) #print msg.encode('hex') + "\r\n" msg = message_format.parse(msg[1:]) print msg #return msg gc.collect() time.sleep(.2) except (KeyboardInterrupt, SystemExit, Exception, TypeError): self.alive = False self.serial.close() raise def writer(self): """ Builds the packet to poll each node for data. Writes that data to the serial port using self.trans_thread """ import time try: while self.alive: try: dest_module_code = ['DRILLRIG', 'POWERPLANT', 'GENSET', 'MUDPUMP'] dest_ser_no = lambda x: x + 1 for code in dest_module_code: if code != 'POWERPLANT': msg = build_message_to_send( data_len=0x10, dest_module_code='%s' % (code), dest_ser_no=dest_ser_no(0), dest_customer_code='*****', ret_ser_no=0x01, ret_module_code='DOGHOUSE', ret_customer_code='*****', command='POLL_NODE', data=[]) self.serial.write(msg) time.sleep(.2) gc.collect() elif code == 'POWERPLANT': msg = build_message_to_send( data_len=0x10, dest_module_code='POWERPLANT', dest_ser_no=dest_ser_no(0), dest_customer_code='*****', ret_ser_no=0x01, ret_module_code='DOGHOUSE', ret_customer_code='*****', command='POLL_NODE', data=[]) self.serial.write(msg) time.sleep(.2) gc.collect() msg = build_message_to_send( data_len=0x10, dest_module_code='POWERPLANT', dest_ser_no=dest_ser_no(1), dest_customer_code='*****', ret_ser_no=0x01, ret_module_code='DOGHOUSE', ret_customer_code='*****', command='POLL_NODE', data=[]) self.serial.write(msg) time.sleep(.2) gc.collect() except (KeyboardInterrupt, SystemExit): self.alive = False self.serial.close() raise except (KeyboardInterrupt, SystemExit): self.alive = False self.serial.close() raise def main(): poller = Poller( port='COM4', baudrate=115200, parity=serial.PARITY_NONE, rtscts=0, xonxoff=0, ) poller.start() poller.reader() poller.writer() poller.stop() if __name__ == '__main__': main() 
+4
source share
1 answer

It is very difficult (if not impossible) to write a direct one-to-one matching program between the approach with the thread / queue and the one that uses twisted.

I would suggest that you can use the twisted and reactor way of using the protocol and specific protocol methods. Think about this, since all asynchronous things that you explicitly encoded using threads and queues are provided to you for free when you use deferred ones using twisted ones.

twisted, it seems, supports SerialPort over this reactor using the SerialPort transport class, and the basic structure looks something like this.

 from twisted.internet import reactor from twisted.internet.serialport import SerialPort SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate)) reactor.run() 

In YourProtocolClass (), you will handle various events specific to your serial port requirements. The doc / core / examples directory contains examples such as gpsfix.py and mouse.py.

+7
source

Source: https://habr.com/ru/post/1336011/


All Articles