How to join the stage of two subprocesses and pipe to stdin of a new subprocess in python

Suppose I had the following command launched from a shell

{ samtools view -HS header.sam; # command1 samtools view input.bam 1:1-50000000; # command2 } | samtools view -bS - > output.bam # command3 

For those of you who are not familiar with the samtools view (since it is stackoverflow). Basically this is creating a new bam file with a new header. Bam files are usually large compressed files, so even going through a file in some cases can take a lot of time. One alternative approach would be to go through command2 and then use the samtools utility to switch the header. This goes through a large file twice. The above command goes through bam at a time, which is good for large bam files (they become larger than 20 GB even when compressed - WGS).

My question is how to implement commands of this type in python using a subprocess.

I have the following:

 fh_bam = open('output.bam', 'w') params_0 = [ "samtools", "view", "-HS", "header.sam" ] params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"] params_2 = [ "samtools", "view", "-bS", "-" ] sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE) sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE) ### SOMEHOW APPEND sub_1.stdout to sub_0.stdout sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam) 

Any help is appreciated. Thanks.

+6
source share
4 answers

If you already have a shell command in the line, you can simply run it as it is:

 #!/usr/bin/env python from subprocess import check_call check_call(r""" { samtools view -HS header.sam; # command1 samtools view input.bam 1:1-50000000; # command2 } | samtools view -bS - > output.bam # command3 """, shell=True) 

To emulate a pipeline in Python:

 #!/usr/bin/env python from subprocess import Popen, PIPE # start command3 to get stdin pipe, redirect output to the file with open('output.bam', 'wb', 0) as output_file: command3 = Popen("samtools view -bS -".split(), stdin=PIPE, stdout=output_file) # start command1 with its stdout redirected to command3 stdin command1 = Popen('samtools view -HS header.sam'.split(), stdout=command3.stdin) rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies # start command2 after command1 finishes command2 = Popen('samtools view input.bam 1:1-50000000'.split(), stdout=command3.stdin) command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error) rc_command2 = command2.wait() rc_command3 = command3.wait() 
+4
source

(I cannot comment sadly, but this β€œanswer” is a comment on cmidi's answer, if someone can move it, it will be appreciated! - PS: this answer has been deleted ...)

Marco explicitly said that teams produce a lot of output, about 20 GB. If you use communication (), it will wait for the process to complete, which means that the "fd" handle will have to store this large amount of data. In practice, the OS will fill the data to disk in the meantime, if your computer does not have more than 20 GB of free memory. Thus, you end up writing intermediate data to disk that the author wanted to avoid. +1 for sirlark answer!
+1
source

I assume that concatenation of outputs from the first two subprocesses in memory is not possible due to the size of the involved files. I would suggest wrapping the outputs of the first two subprocesses in a file like. It looks like you only need a read method, as popen will only read from its stdin file, not search or write. The code below assumes that returning an empty string from a read is sufficient to indicate that the stream is in EOF

 class concat(object): def __init__(self, f1, f2): self.f1 = f1 self.f2 = f2 def read(self, *args): ret = self.f1.read(*args) if ret == '': ret = self.f2.read(*args) return ret fh_bam = open('output.bam', 'w') params_0 = [ "samtools", "view", "-HS", "header.sam" ] params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"] params_2 = [ "samtools", "view", "-bS", "-" ] sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE) sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE) ### Somehow Append sub_1.stdout to sub_0.stdout sub_2 = subprocess.Popen(params_2, stdin=concat(sub_0.stdout, sub_1.stdout), stdout=fh_bam) 

To clarify, f1.read will block and return '' only when the channel is closed / EOF 'd. concat.read only tries to read f2 after this, so the output from f1 and f2 will not be intertwined. Of course, there is little overhead for repeating the end of f1 , which can be prevented by setting a flag variable to indicate which file to read from. I doubt it will significantly improve performance.

0
source

While Popen accepts file objects, it actually uses the basic file descriptors / descriptors, rather than the methods of reading and writing file objects for communication, as @JF Sebastian rightly points out. The best way to do this is to use a pipe ( os.pipe() ) that does not use the disk. This allows you to connect the output stream directly to the input stream of another process that you want. The problem is serialization only, to make sure the two source streams are not interleaved.

 import os import subprocess r, w = os.pipe() fh_bam = open('output.bam', 'w') params_0 = [ "samtools", "view", "-HS", "header.sam" ] params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"] params_2 = [ "samtools", "view", "-bS", "-" ] sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096) sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096) sub_src1.communicate() sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096) sub_src2.communicate() 

First, we open the sink (pipe reader), and then communicate with the source processes only to avoid potential blocking, as mentioned in @Ariel. It also causes the first source process to complete and wash out its output through the pipe before the second source process can write to the pipe, preventing interleaving / interrupted output. You can play with bufsize to tune performance.

This is almost the same as the shell command.

-1
source

Source: https://habr.com/ru/post/983130/


All Articles