Mrjob: setup registration in EMR

I'm trying to use mrjob to run hadoop in EMR and can't figure out how to configure logging (user-created logs in the map / reduce steps), so I can access them after the cluster finishes.

I tried to configure logging using the logging , print and sys.stderr.write() module, but so far no luck. The only option that works for me is to write the logs to a file, then the SSH machine and read it, but its bulkiness. I would like my logs to go to stderr / stdout / syslog and automatically collect to S3, so I can view them after the cluster finishes.

Here is a logging word_freq example:

 """The classic MapReduce job: count the frequency of words. """ from mrjob.job import MRJob import re import logging import logging.handlers import sys WORD_RE = re.compile(r"[\w']+") class MRWordFreqCount(MRJob): def mapper_init(self): self.logger = logging.getLogger() self.logger.setLevel(logging.INFO) self.logger.addHandler(logging.FileHandler("/tmp/mr.log")) self.logger.addHandler(logging.StreamHandler()) self.logger.addHandler(logging.StreamHandler(sys.stdout)) self.logger.addHandler(logging.handlers.SysLogHandler()) def mapper(self, _, line): self.logger.info("Test logging: %s", line) sys.stderr.write("Test stderr: %s\n" % line) print "Test print: %s" % line for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRWordFreqCount.run() 
+6
source share
2 answers

Of all the parameters, the only thing that really works is using stderr with direct recording ( sys.stderr.write ) or using the recorder with StreamHandler for stderr.

Logs can later be restored after completion of the task (successfully or with an error):

[s3_log_uri] / [jobflow identifier] / TASK attempt / [job application identifier] / [attempt application identifier] / STDERR

Be sure to save the logs in the runners.emr.cleanup configuration.

+4
source

Here is an example to enter stdout (python3)

 from mrjob.job import MRJob from mrjob.job import MRStep from mrjob.util import log_to_stream, log_to_null import re import sys import logging log = logging.getLogger(__name__) WORD_RE = re.compile(r'[\w]+') class MostUsedWords(MRJob): def set_up_logging(cls, quiet=False, verbose=False, stream=None): log_to_stream(name='mrjob', debug=verbose, stream=stream) log_to_stream(name='__main__', debug=verbose, stream=stream) def steps(self): return [ MRStep (mapper = self.mapper_get_words, combiner = self.combiner_get_words, reducer = self.reduce_get_words), MRStep (reducer = self.reducer_find_max) ] pass def mapper_get_words(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner_get_words(self, word, counts): yield (word, sum(counts)) def reduce_get_words(self, word, counts): log.info(word + "\t" +str(list(counts)) ) yield None, (sum(counts), word) def reducer_find_max(self, key, value): # value is pairs ie, tuples yield max(value) if __name__ == '__main__': MostUsedWords.run() 
0
source

Source: https://habr.com/ru/post/1203723/


All Articles