Python process hangs due to open Paramiko ssh connections

I use Paramiko to monitor logs on remote machines during a test run.

The monitor occurs in the daemon thread, which pretty much does this:

ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) transport = ssh.get_transport() channel = transport.open_session() channel.exec_command('sudo tail -f ' + self.logfile) last_partial = '' while not self.stopped.isSet(): try: if None == select or None == channel: break rl, wl, xl = select.select([channel], [], [], 1.0) if None == rl: break if len(rl) > 0: # Must be stdout, how can I check? line = channel.recv(1024) else: time.sleep(1.0) continue except: break if line: #handle saving the line... lines are 'merged' so that one log is made from all the sources ssh.close() 

I had problems with reading lock, so I started doing something this way and most of the time it works fine. I think I run into problems when the network is slow.

Sometimes I see this error at the end of the run (after setting self.stopped above). I tried to sleep after I stopped the installation and connected all the monitor threads, but freezing can still occur.

 Exception in thread Thread-9 (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run <type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error' 

In transport.py from Paramiko, I think that is where the error is. Find No. <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<; <<<below.

  self._channel_handler_table[ptype](chan, m) elif chanid in self.channels_seen: self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) else: self._log(ERROR, 'Channel request for unknown channel %d' % chanid) self.active = False self.packetizer.close() elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table): self.auth_handler._handler_table[ptype](self.auth_handler, m) else: self._log(WARNING, 'Oops, unhandled type %d' % ptype) msg = Message() msg.add_byte(cMSG_UNIMPLEMENTED) msg.add_int(m.seqno) self._send_message(msg) except SSHException as e: self._log(ERROR, 'Exception: ' + str(e)) self._log(ERROR, util.tb_strings()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470 self.saved_exception = e except EOFError as e: self._log(DEBUG, 'EOF in transport thread') #self._log(DEBUG, util.tb_strings()) self.saved_exception = e except socket.error as e: if type(e.args) is tuple: if e.args: emsg = '%s (%d)' % (e.args[1], e.args[0]) else: # empty tuple, eg socket.timeout emsg = str(e) or repr(e) else: emsg = e.args self._log(ERROR, 'Socket exception: ' + emsg) self.saved_exception = e except Exception as e: self._log(ERROR, 'Unknown exception: ' + str(e)) self._log(ERROR, util.tb_strings()) 

When the start is stuck, I can start β†’ β†’> sudo lsof -i -n | egrep '\' to see that ssh connections are really stuck (vaguely stuck). My main testing process is PID 15010.

 sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN) sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN) sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED) sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED) python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED) python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED) python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED) python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED) python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED) python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED) sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED) sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED) 

So, I just want my process to not hang. Oh, and I don’t want to update Paramiko if I need to upgrade Python for 2.6.6, because I'm on centos and from what I read, I walk past 2.6.6, it can be "complicated".

Thanks for any ideas.


Comment on shavenwarthog, which is too long for comments:

Hi, thanks for the reply. I have a few quick questions. 1) what if I need to stop threads at an unknown time? In other words, the tail -f blah.log threads will run for 3 minutes, and I want to perhaps check the accumulated data 10 times in these three minutes? 2) sort of, I think when I tried this with some actual remote machines it would not exit (since tail -f never exits). I forgot about it, but I think that non-blocking reading should solve this. Do you think that the other thread you commented on , plus this one, is enough to make this work? Basically use my non-blocking read to collect data locally for each stream of runners. Then I will only need to block when the main thread wants to receive data from each runner who seems to be distributing my one lock to say 10 locks, and that will help. Does this make sense?

+6
source share
1 answer

The following code runs the command on multiple hosts. When each command expects some data, it is printed on the screen.

The general form is adapted from Alex Martelli code . This version has more protocols, including showing a human-readable version of each connection node.

The original code was written for commands that run, then exit. I changed it to print data gradually when it is available. Previously, the first thread that captured the lock would block on read() , and all threads would starve. A new solution gets around this.

EDIT , some notes:

To stop the program at a later time, we are faced with a rather sticky situation. The threads are uninterrupted - we cannot just configure the signal handler for the sys.exit() program. The updated code is configured to exit safely after 3 seconds using the while loop to join() each thread. For real code, if the parent element exits from it, the threads should also be correct. Carefully pay attention to the two WARNINGS in the code, since the signal / output / stream interaction is pretty protein.

The code processes the data as it arrives - right now the data is simply printed on the console. It does not use non-blocking reads, because 1) the non-blocking code is much more complicated and 2) the source program did not process the data of the child threads in the parent. For streams, it is easier to execute all child streams that are written to a file, database, or service. For something more complicated, use multiprocessing , which is much simpler, and well placed to do a lot of tasks and restart them if they die. This library also allows you to distribute the load on multiple processors that do not allow threads.

Good luck

EDIT No. 2

Please note that it is possible and perhaps preferable to run multiple processes without using threading and multiprocessing . TL; DR: use the Popen and a select() to process output batches. See Pastebin Code Example: Run Multiple Commands Without Subprocess / Multiprocessing

A source

 # adapted from /questions/393355/creating-multiple-ssh-connections-at-a-time-using-paramiko import signal, sys, threading import paramiko CMD = 'tail -f /var/log/syslog' def signal_cleanup(_signum, _frame): print '\nCLEANUP\n' sys.exit(0) def workon(host): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host) _stdin, stdout, _stderr = ssh.exec_command(CMD) for line in stdout: print threading.current_thread().name, line, def main(): hosts = ['localhost', 'localhost'] # exit after a few seconds (see WARNINGs) signal.signal(signal.SIGALRM, signal_cleanup) signal.alarm(3) threads = [ threading.Thread( target=workon, args=(host,), name='host #{}'.format(num+1) ) for num,host in enumerate(hosts) ] print 'starting' for t in threads: # WARNING: daemon=True allows program to exit when main proc # does; otherwise we'll wait until all threads complete. t.daemon = True t.start() print 'joining' for t in threads: # WARNING: t.join() is uninterruptible; this while loop allows # signals # see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html while t.is_alive(): t.join(timeout=0.1) print 'done!' if __name__=='__main__': main() 

Output

 starting joining host #2 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1 host #2 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:29:36 palabras kernel: [159020.809748] ideapad_laptop: Unknown event: 1 
+6
source

Source: https://habr.com/ru/post/970900/


All Articles