Take a peek at the Popen pipeline stream in Python

Background:
Python 2.6.6 on Linux. The first part of the analysis of DNA sequence analysis.
I want to read a gzipped file with remote storage (LAN) installed, and if it was gzipped; gunzip it into the stream (ie using gunzip FILENAME -c ), and if the first character of the stream (file) is "@", lay the entire stream in a filter program that accepts input on standard input, otherwise just connect it directly to file on the local drive. I would like to minimize the number of read / search files from the remote repository (just one pass through the file cannot be impossible?).

The contents of the sample input file, the first four lines corresponding to one record in FASTQ format:

 @I328_1_FC30MD2AAXX:8:1:1719:1113/1 GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG +I328_1_FC30MD2AAXX:8:1:1719:1113/1 hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhahhhhhhfShhhYhhQhh]hhhhffhU\UhYWc 

Files that should not be sent to the filtering program contain entries that look like this (first two lines corresponding to one entry in FASTA format):

 >I328_1_FC30MD2AAXX:8:1:1719:1113/1 GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG 

Some of them attempt a semi-pseudo-code to visualize what I want to do (I know this is not possible, as I wrote it). Hope this makes sense:

 if gzipped: gunzip = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE) if gunzip.stdout.peek(1) == "@": # This isn't possible fastq = True else: fastq = False if fastq: filter = Popen(["filter", "localstorage/outputfile.fastq"], stdin=gunzip.stdout).communicate() else: # Send the gunzipped stream to another file 

Do not pay attention to the fact that the code will not work, as I wrote it here, and that I do not have error handling, etc., all this is already in my other code. I just need help to peek into the stream or find a way around this. I would be nice if you could gunzip.stdout.peek(1) , but I understand that this is not possible.

What I have tried so far:
I realized subprocess.Popen can help me achieve this, and I have tried many different ideas, among others I am trying to use some kind of io.BufferedRandom () object to write a stream, but I cannot figure out how this should work. I know that streams are not searchable, but perhaps a workaround could be to read the first character of the gunzip stream, and then create a new stream in which you first enter "@" or ">" depending on the contents of the file, and then fill the rest of the gunzip.stdout stream into a new stream. This new stream will then be fed to the Popen stdin filter.

Please note that the file size can be several times larger than the available memory. I do not want to perform more than one reading of the source file from remote storage and without unnecessary access to the files.

Any ideas are welcome! Please ask me questions so that I can clarify if I have clarified this.

+2
source share
2 answers

Here is the implementation of your first input "@" or ">" depending on the contents of the file, and then add the rest of the gunzip.stdout file to the new stream sentence. I tested only the branch of the local test file, but this is enough to demonstrate the concept.

 if gzipped: source = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE) else: source = Popen(["cat", "remotestorage/file"], stdout=PIPE) firstchar = source.stdout.read(1) # "unread" the char we've just read source = Popen([r"(printf '\x%02x' && cat)" % ord(firstchar)], shell=True, stdin=source.stdout, stdout=PIPE) # Now feed the output to a filter or to a local file. flocal = None try: if firstchar == "@": filter = Popen(["filter", "localstorage/outputfile.fastq"], stdin=source.stdout) else: flocal = open('localstorage/outputfile.stream', 'w') filter = Popen(["cat"], stdin=source.stdout, stdout=flocal) filter.communicate() finally: if flocal is not None: flocal.close() 

The idea is to read one character from the output of the original command, and then recreate the original output with (printf '\xhh' && cat) , effectively implementing peek. The replaced thread points shell=True to Popen , leaving it for the shell and cat for heavy lifting. Data is always under development, never read completely in memory. Note that shell services are only requested for a single Popen call that implements Popen archived byte, and not calls associated with user-supplied file names. Even at this point, the byte is escaped to hexadecimal to ensure that the shell does not distort it when calling printf .

The code can be further flushed to implement an actual function called peek , which returns the contents of peeked and replaces new_source .

+1
source

It makes no sense to wrap shell commands in Python. You can achieve everything you need in Python, but without a workaround:

  • Open the input file and read the first 3 bytes. If they are 1F 8B 08 then this should be a gzip file.
  • Reset file marker
  • Download the contents of the file to zlib.decompress () if it is a gzip file or a readable file.
  • If necessary, pass the filter function
  • write the results to a file

EDIT

This will not work, as gzip headers will need to be removed before going to zlib. However, it would be possible to check the first 3 bytes, execute fh.seek(0) and pass the file to gzip.open () if you want to make sure the file was gzip (with DEFLATE compression).

It may be easier to just transfer the file to gzip and catch an exception if the file was not gzipped:

 import gzip try: in_file = gzip.open("infile") f_contents = in_file.read() except IOError, e: # Re-raise exception if exception message is not "Not a gzipped file" # Perhaps it would be safer to check the header! if e.__str__() != "Not a gzipped file": raise in_file = open("infile") f_contents = in_file.read() if f_contents[0] == "@": result = filter_function(f_contents) else: result = f_contents new_file = open("new_file", "w") new_file.write(result) 
0
source

Source: https://habr.com/ru/post/949691/


All Articles