I have a Flask application that links to a backend workflow process on zeromq.
- Using Flask's toy server works.
- Deployment to uWSGI using a single web process is fine.
- Deploying to uWSGI using multiple web processes at the setup stage, including communicating with all web processes with a backend worker, is great.
- After the uWSGI processes contact the backend worker during the request it does not work .
So what's the problem? As soon as there is an exchange of information with the back-end worker, zeromq is interrupted by the EINTR signal. This happens repeatedly and sequentially - when you try to send or receive 10 times in a row, it will be interrupted 10 times.
I can not find much documentation in uWSGI regarding IPC. Does this need to be done by creating the zeromq plugin? There is a hint on the next page, but I canβt understand if I need it and how to use it: http://uwsgi-docs.readthedocs.org/en/latest/Changelog-1.9.21.html?highlight=zmq_init .
So let me rephrase my question:
- How would you make IPC from the uWSGI web application?
- Do you have any idea about what went wrong with what signals zeromq receives when uWSGI is operating in multiprocess mode?
Here is some code to give you an idea.
environment.py
class Environment(Flask):
def __init__(self, db_server_uri=u'http://localhost:5984/', debug=True):
Flask.__init__(self, __name__, static_folder='static')
self.config.from_object(__name__)
self.debug = debug
if debug:
print "STARTING ENVIRONMENT IN DEBUG MODE"
self.secret_key = '...'
self.card_service = CardService(environment_for_logging=self)
card_service.py
from card_worker import host, work_ports, result_port
class CardService(object):
def __init__(self, environment_for_logging=None):
self.environment_for_logging = None
if environment_for_logging != None:
self.environment_for_logging = weakref.ref(environment_for_logging)
context = zmq.Context()
self.work_sender = context.socket(zmq.PUSH)
port_selected = None
try:
port_selected = self.work_sender.bind_to_random_port(
'tcp://*',
min_port=work_ports[0],
max_port=work_ports[len(work_ports)-1],
max_tries=1000
)
except zmq.error.ZMQError as e:
self.tear_down()
raise Exception(str(e) + ": " + str(work_uri))
self.result_receiver = context.socket(zmq.SUB)
result_uri = "%s:%s" %(host, str(result_port))
try:
self.result_receiver.connect(result_uri)
self.result_receiver.setsockopt(zmq.SUBSCRIBE,'')
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "CARDSERVICE IS CONNECTED TO RESULT SOCKET %s" %(result_uri)
except zmq.error.ZMQError as e:
self.tear_down()
raise Exception(str(e) + ": " + str(result_uri))
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "CARDSERVICE STARTED ON PORT %s" %(str(port_selected))**
def _get_result(self, work):
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "%s requested from card service" %(work['action'])
work['work_id'] = str(uuid.uuid4())
finished = False
num_trials = 0
while not finished and num_trials < 10:
try:
self.work_sender.send_json(work)
finished = True
except zmq.error.ZMQError as e:
if e.errno == errno.EINTR:
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "zmq_term interrupted by signal, restarting"
num_trials += 1
else:
raise e
except TypeError as e:
raise Exception(str(e) + "; Attempted work package to be serialized: " + pprint.pformat(work))
result = {}
finished = False
num_trials = 0
while not finished and num_trials < 10:
try:
result = self.result_receiver.recv_json()
if result.get('work_id') != work['work_id']:
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "card service has received result with the wrong work id: %s" %(result.get('work_id'))
continue
finished = True
except zmq.error.ZMQError as e:
if e.errno == errno.EINTR:
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "zmq_term interrupted by signal, restarting"
num_trials += 1
else:
raise e
if not finished:
raise Exception("Data processing could not complete after %i tries" %(num_trials))
if self.environment_for_logging != None and self.environment_for_logging().debug:
print "%s finished" %(work['action'])
return result
def do_some_stuff(self, input):
result = self._get_result({
'action':'do_some_stuff',
'input':input
})
return
card_worker.py (runs in the background)
host = "tcp://127.0.0.1"
work_ports = [6556, 6557, 6558, 6559]
result_port = 7000
class CardWorker(object):
def __init__(self, debug=True):
context = zmq.Context()
self.receiver = context.socket(zmq.PULL)
for port in work_ports:
work_uri = "%s:%s" %(host, str(port))
try:
self.receiver.connect(work_uri)
if debug:
print "CARD WORKER CONNECTED TO WORK SOCKET %s" %(work_uri)
except zmq.error.ZMQError as e:
self.tear_down()
raise Exception(str(e) + ": " + str(work_uri))
self.sender = context.socket(zmq.PUB)
result_uri = "%s:%s" %(host, str(result_port))
try:
self.sender.bind(result_uri)
if debug:
print "CARD WORKER HAS ESTABLISHED RESULT SOCKET %s" %(result_uri)
except zmq.error.ZMQError as e:
self.tear_down()
raise Exception(str(e) + ": " + str(result_uri))
self.debug = debug
def __del__(self):
os.chdir(self.saved_path)
def start(self):
if self.debug:
print u"LISTENING"
while True:
work = self.receiver.recv_json()
input = work.get('input')
action = work.get('action')
work_id = work.get('work_id', '')
if action == 'do_some_stuff':
result = {
'output':input[0] + input[1]
}
result['work_id'] = work_id
if self.debug:
print u"SENT:\n" + pprint.pformat(result)
self.sender.send_json(result)
if __name__ == '__main__':
parser = OptionParser()
parser.add_option("-v", "--debug", action="store_true", dest="debug")
(options, args) = parser.parse_args()
worker = CardWorker(options.debug)
worker.start()
: do_some_stuff "zmq_term, , " 10 . . uwsgi strace -f ,
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
[pid 71] write(1, "zmq_term interrupted by signal, "..., 43zmq_term interrupted by signal, restarting) = 43
, , . uwsgi :
uwsgi --http :5000 --module server --callable env --enable-threads --master --processes 4 --honour-stdin --reload-mercy 60
, ?