Multiple inputs with MRJob

I am trying to learn how to use the Yelp Python API for MapReduce, MRJob. Their simple example of a word counter makes sense, but I'm curious how you can handle an application with multiple inputs. For example, instead of simply counting words in a document, multiplying a vector by a matrix. I came up with this solution that works, but feels stupid:

class MatrixVectMultiplyTast(MRJob): def multiply(self,key,line): line = map(float,line.split(" ")) v,col = line[-1],line[:-1] for i in xrange(len(col)): yield i,col[i]*v def sum(self,i,occurrences): yield i,sum(occurrences) def steps(self): return [self.mr (self.multiply,self.sum),] if __name__=="__main__": MatrixVectMultiplyTast.run() 

This code runs ./matrix.py < input.txt , and the reason it works is because the matrix stored in input.txt is column by column, with the corresponding vector value at the end of the line.

So, the following matrix and vector:

enter image description here

presented as input.txt as:

enter image description here

In short, how could I store the matrix and vector more naturally in separate files and transfer them as in MRJob?

+6
source share
4 answers

If you need to process raw data on a different (or the same row row_i, row_j) dataset, you can:

1) Create an S3 bucket to store a copy of your data. Pass the location of this copy to the task class, for example. self.options.bucket and self.options.my_datafile_copy_location in the code below. Caveat: Unfortunately, it seems that the entire file should be β€œdownloaded” to the target computers before processing. If connections fail or load for too long, this task may fail. Here is the Python / MRJob code for you.

Put this in your mapper function:

 d1 = line1.split('\t', 1) v1, col1 = d1[0], d1[1] conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() ### CAVEAT: Needs to get the whole file before processing the rest. for line2 in data_copy.split('\n'): d2 = line2.split('\t', 1) v2, col2 = d2[0], d2[1] ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: yield <your output key, value pairs> conn.close() 

2) Create a SimpleDB domain and save all your data there. Read here on boto and SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

Your mapper code will look like this:

 dline = dline.strip() d0 = dline.split('\t', 1) v1, c1 = d0[0], d0[1] sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) for item in domain: v2, c2 = item.name, item['column'] ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: yield <your output key, value pairs> sdb.close() 

This second option may work better if you have very large amounts of data, since it can make queries for each row of data, and not the entire amount at the same time. Keep in mind that SimpleDB values ​​can contain a maximum of 1024 characters, so you may need to compress / decompress using some method if your data values ​​are larger.

+3
source

The actual answer to your question is that mrjob does not yet support the hadoop streaming connection template, which must read the map_input_file environment variable (which provides the map.input.file property) to determine what type of file you are using when dealing with its paths and / or name.

You can still disable it, if you can easily detect only reading the data itself, what type it belongs to, as shown in this article:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

However, this is not always possible ...

Otherwise, myjob looks fantastic, and I want them to be able to add support for this in the future. Until then, it is quite difficult for me.

+2
source

This is how I use several inputs and, based on the file name, make the appropriate changes to the display phase.

Runner program:

 from mrjob.hadoop import * #Define all arguments os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') hadoop_bin = '/usr/bin/hadoop' mode = 'hadoop' hs = HadoopFilesystem([hadoop_bin]) input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] aargs.extend(input_file_names) aargs.extend(['-o',output_dir]) print aargs status_file = True mr_job = MRJob(args=aargs) with mr_job.make_runner() as runner: runner.run() os.environ['HADOOP_HOME'] = '' print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

MRJob Class:

 class MR_Job(MRJob): DEFAULT_OUTPUT_PROTOCOL = 'repr_value' def mapper(self, _, line): """ This function reads lines from file. """ try: #Need to clean email. input_file_name = get_jobconf_value('map.input.file').split('/')[-2] """ Mapper code """ except Exception, e: print e def reducer(self, email_id,visitor_id__date_time): try: """ Reducer Code """ except: pass if __name__ == '__main__': MRV_Email.run() 
+2
source

In my understanding, you would not use MrJob if you did not want to use the Hadoop cluster or Amazon's Hadoop services, even if this example uses the launch of local files.

MrJob mainly uses " Hadoop streaming " to submit a task.

This means that all inputs specified as files or folders from Hadoop are transferred to mapper and subsequent results to the reducer. The whole mapper receives an input fragment and considers that all the input data are schematically the same, so that it evenly analyzes and processes the key, the value for each data slice.

Based on this understanding, the inputs are schematically the same with the converter. The only way to include two different circuit data is to alternate them in one file so that the cartographer can understand what is vector data and which is matrix data.

 You are actually doing it already. 

You can simply improve this by specifying some qualifier if the row is matrix data or vector data. When you see the vector data, the previous matrix data will be applied to them.

 matrix, 1, 2, ... matrix, 2, 4, ... vector, 3, 4, ... matrix, 1, 2, ... ..... 

But the process you talked about works well. You must have all the schematic data in one file.

It still has problems. Reducing the K, V card works better when the complete circuit is present on one line and contains a complete single processor.

In my opinion, you are already doing it right, but I think Map-Reduce is not a suitable mechanism for this kind of data. Hope someone clarifies this even further than I could.

+1
source

Source: https://habr.com/ru/post/908565/


All Articles