UWSGI application with zeromq workers exchanging data

I have a Flask application that links to a backend workflow process on zeromq.

  • Using Flask's toy server works.
  • Deployment to uWSGI using a single web process is fine.
  • Deploying to uWSGI using multiple web processes at the setup stage, including communicating with all web processes with a backend worker, is great.
  • After the uWSGI processes contact the backend worker during the request it does not work .

So what's the problem? As soon as there is an exchange of information with the back-end worker, zeromq is interrupted by the EINTR signal. This happens repeatedly and sequentially - when you try to send or receive 10 times in a row, it will be interrupted 10 times.

I can not find much documentation in uWSGI regarding IPC. Does this need to be done by creating the zeromq plugin? There is a hint on the next page, but I can’t understand if I need it and how to use it: http://uwsgi-docs.readthedocs.org/en/latest/Changelog-1.9.21.html?highlight=zmq_init .

So let me rephrase my question:

  • How would you make IPC from the uWSGI web application?
  • Do you have any idea about what went wrong with what signals zeromq receives when uWSGI is operating in multiprocess mode?

Here is some code to give you an idea.

environment.py

class Environment(Flask):

def __init__(self, db_server_uri=u'http://localhost:5984/', debug=True):
    Flask.__init__(self, __name__, static_folder='static')
    self.config.from_object(__name__)
    self.debug = debug
    if debug:
        print "STARTING ENVIRONMENT IN DEBUG MODE"
    self.secret_key = '...'
    self.card_service = CardService(environment_for_logging=self)
    # (...some communication with card service during setup phase, working fine... )
    # (...some stuff happens during request that will call self.card_service.do_some_stuff() ...)

card_service.py

from card_worker import host, work_ports, result_port
class CardService(object):
        def __init__(self, environment_for_logging=None):
            self.environment_for_logging = None
            if environment_for_logging != None:
                self.environment_for_logging = weakref.ref(environment_for_logging)
            context = zmq.Context()
            self.work_sender = context.socket(zmq.PUSH)
            port_selected = None
            try:
                #we choose a random port out of four ports since four uwsgi processes will be started
                port_selected = self.work_sender.bind_to_random_port(
                    'tcp://*',
                    min_port=work_ports[0],
                    max_port=work_ports[len(work_ports)-1],
                    max_tries=1000
                )
            except zmq.error.ZMQError as e:
                self.tear_down()
                raise Exception(str(e) + ": " + str(work_uri))
            self.result_receiver = context.socket(zmq.SUB)
            result_uri = "%s:%s" %(host, str(result_port))
            try:
                self.result_receiver.connect(result_uri)
                self.result_receiver.setsockopt(zmq.SUBSCRIBE,'')
                if self.environment_for_logging != None and self.environment_for_logging().debug:
                    print "CARDSERVICE IS CONNECTED TO RESULT SOCKET %s" %(result_uri)
            except zmq.error.ZMQError as e:
                self.tear_down()
                raise Exception(str(e) + ": " + str(result_uri))
            if self.environment_for_logging != None and self.environment_for_logging().debug:
                print "CARDSERVICE STARTED ON PORT %s" %(str(port_selected))**

     def _get_result(self, work):
        if self.environment_for_logging != None and self.environment_for_logging().debug:
            print "%s requested from card service" %(work['action'])
        work['work_id'] = str(uuid.uuid4())
        finished = False
        num_trials = 0
        while not finished and num_trials < 10:
            try:
                self.work_sender.send_json(work)
                finished = True
            except zmq.error.ZMQError as e:
                if e.errno == errno.EINTR:
                    if self.environment_for_logging != None and self.environment_for_logging().debug:
                        print "zmq_term interrupted by signal, restarting"
                    num_trials += 1
                else:
                    raise e
            except TypeError as e:
                raise Exception(str(e) + "; Attempted work package to be serialized: " + pprint.pformat(work))
        result = {}
        finished = False
        num_trials = 0
        while not finished and num_trials < 10:
            try:
                result = self.result_receiver.recv_json() # result_queue.get(block=True,timeout=3)
                if result.get('work_id') != work['work_id']:
                    if self.environment_for_logging != None and self.environment_for_logging().debug:
                        print "card service has received result with the wrong work id: %s" %(result.get('work_id'))
                    continue
                finished = True
            except zmq.error.ZMQError as e:
                if e.errno == errno.EINTR:
                    if self.environment_for_logging != None and self.environment_for_logging().debug:
                        print "zmq_term interrupted by signal, restarting"
                    num_trials += 1
                else:
                    raise e
        if not finished:
            raise Exception("Data processing could not complete after %i tries" %(num_trials))
        if self.environment_for_logging != None and self.environment_for_logging().debug:
            print "%s finished" %(work['action'])
        return result

      def do_some_stuff(self, input):
        result = self._get_result({
            'action':'do_some_stuff',
            'input':input
        })
        return

card_worker.py (runs in the background)

host = "tcp://127.0.0.1"
work_ports = [6556, 6557, 6558, 6559]
result_port = 7000

class CardWorker(object):
    def __init__(self, debug=True):
        context = zmq.Context()
        self.receiver = context.socket(zmq.PULL)
        for port in work_ports:
            work_uri = "%s:%s" %(host, str(port))
            try:
                self.receiver.connect(work_uri)
                if debug:
                    print "CARD WORKER CONNECTED TO WORK SOCKET %s" %(work_uri)
            except zmq.error.ZMQError as e:
                self.tear_down()
                raise Exception(str(e) + ": " + str(work_uri))

        self.sender = context.socket(zmq.PUB)
        result_uri = "%s:%s" %(host, str(result_port))
        try:
            self.sender.bind(result_uri)
            if debug:
                print "CARD WORKER HAS ESTABLISHED RESULT SOCKET %s" %(result_uri)
        except zmq.error.ZMQError as e:
            self.tear_down()
            raise Exception(str(e) + ": " + str(result_uri))
        self.debug = debug

    def __del__(self):
        os.chdir(self.saved_path)

    def start(self):
        if self.debug:
            print u"LISTENING"
        while True:
            work = self.receiver.recv_json()
            input = work.get('input')
            action = work.get('action')
            work_id = work.get('work_id', '')

            if action == 'do_some_stuff':
                result = {
                    'output':input[0] + input[1]
                }
            result['work_id'] = work_id

            if self.debug:
                print u"SENT:\n" + pprint.pformat(result)
            self.sender.send_json(result)

if __name__ == '__main__':
    parser = OptionParser()
    parser.add_option("-v", "--debug", action="store_true", dest="debug")
    (options, args) = parser.parse_args()

    worker = CardWorker(options.debug)
    worker.start()

: do_some_stuff "zmq_term, , " 10 . . uwsgi strace -f ,

[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid    71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43

, , . uwsgi :

uwsgi --http :5000 --module server --callable env --enable-threads --master --processes 4 --honour-stdin --reload-mercy 60

, ?

+4

Source: https://habr.com/ru/post/1524171/


All Articles