Transition from multiprocessing to streaming

In my project, I use the multiprocessing class to do tasks in parallel. Instead, I want to use threading , as it has better performance (my tasks are related to TCP / IP, not CPU or I / O).

multiprocessing has great features like Pool.imap_unordered and Pool.map_async , which does not exist in the threading class.

What is the correct way to convert my code to use threading instead? The documentation introduces the multiprocessing.dummy class, which is a wrapper for the threading class. However, this causes a lot of errors (at least on python 2.7.3):

  pool = multiprocessing.Pool(processes) File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool return ThreadPool(processes, initializer, initargs) File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__ Pool.__init__(self, processes, initializer, initargs) File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__ self._repopulate_pool() File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool w.start() File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start self._parent._children[self] = None AttributeError: '_DummyThread' object has no attribute '_children' 

Edit: What actually happens is that I have a GUI that launches another thread (to prevent the GUI from freezing from gettint). This thread performs a specific search function that has a ThreadPool that fails.

Edit 2: The fix is ​​fixed and will be included in future releases. It is clearly seen that the starch is fixed!

 import urllib2, htmllib, formatter import multiprocessing.dummy as multiprocessing import xml.dom.minidom import os import string, random from urlparse import parse_qs, urlparse from useful_util import retry import config from logger import log class LinksExtractor(htmllib.HTMLParser): def __init__(self, formatter): htmllib.HTMLParser.__init__(self, formatter) self.links = [] self.ignoredSites = config.WebParser_ignoredSites def start_a(self, attrs): for attr in attrs: if attr[0] == "href" and attr[1].endswith(".mp3"): if not filter(lambda x: (x in attr[1]), self.ignoredSites): self.links.append(attr[1]) def get_links(self): return self.links def GetLinks(url, returnMetaUrlObj=False): ''' Function gather links from a url. @param url: Url Address. @param returnMetaUrlObj: If true, returns a MetaUrl Object list. Else, returns a string list. Default is False. @return links: Look up. ''' htmlparser = LinksExtractor(formatter.NullFormatter()) try: data = urllib2.urlopen(url) except (urllib2.HTTPError, urllib2.URLError) as e: log.error(e) return [] htmlparser.feed(data.read()) htmlparser.close() links = list(set(htmlparser.get_links())) if returnMetaUrlObj: links = map(MetaUrl, links) return links def isAscii(s): "Function checks is the string is ascii." try: s.decode('ascii') except (UnicodeEncodeError, UnicodeDecodeError): return False return True @retry(Exception, logger=log) def parse(song, source): ''' Function parses the source search page and returns the .mp3 links in it. @param song: Search string. @param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong. @return links: .mp3 url links. ''' source = source.lower() if source == "dilandau": return parse_dilandau(song) elif source == "mp3skull": return parse_Mp3skull(song) elif source == "SeekASong": return parse_SeekASong(song) elif source == "youtube": return parse_Youtube(song) log.error('no source "%s". (from parse function in WebParser)') return [] def parse_dilandau(song, pages=1): "Function connects to Dilandau.eu and returns the .mp3 links in it" if not isAscii(song): # Dilandau doesn't like unicode. log.warning("Song is not ASCII. Skipping on dilandau") return [] links = [] song = urllib2.quote(song.encode("utf8")) for i in range(pages): url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1) log.debug("[Dilandau] Parsing %s... " % url) links.extend(GetLinks(url, returnMetaUrlObj=True)) log.debug("[Dilandau] found %d links" % len(links)) for metaUrl in links: metaUrl.source = "Dilandau" return links def parse_Mp3skull(song, pages=1): "Function connects to mp3skull.com and returns the .mp3 links in it" links = [] song = urllib2.quote(song.encode("utf8")) for i in range(pages): # http://mp3skull.com/mp3/how_i_met_your_mother.html url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) log.debug("[Mp3skull] Parsing %s... " % url) links.extend(GetLinks(url, returnMetaUrlObj=True)) log.debug("[Mp3skull] found %d links" % len(links)) for metaUrl in links: metaUrl.source = "Mp3skull" return links def parse_SeekASong(song): "Function connects to seekasong.com and returns the .mp3 links in it" song = urllib2.quote(song.encode("utf8")) url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) log.debug("[SeekASong] Parsing %s... " % url) links = GetLinks(url, returnMetaUrlObj=True) for metaUrl in links: metaUrl.source = "SeekASong" log.debug("[SeekASong] found %d links" % len(links)) return links def parse_Youtube(song, amount=10): ''' Function searches a song in youtube.com and returns the clips in it using Youtube API. @param song: The search string. @param amount: Amount of clips to obtain. @return links: List of links. ''' "Function connects to youtube.com and returns the .mp3 links in it" song = urllib2.quote(song.encode("utf8")) url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount) urlObj = urllib2.urlopen(url, timeout=4) data = urlObj.read() videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry') links = [] for video in videos: youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value links.append(get_youtube_hightest_quality_link(youtube_watchurl)) return links def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority): ''' Function returns the highest quality link for a specific youtube clip. @param youtube_watchurl: The Youtube Watch Url. @param priority: A list represents the qualities priority. @return MetaUrlObj: MetaUrl Object. ''' video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0] youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id d = get_youtube_dl_links(video_id) for x in priority: if x in d.keys(): return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl) log.error("No Youtube link has been found in get_youtube_hightest_quality_link.") return "" @retry(Exception, logger=log) def get_youtube_dl_links(video_id): ''' Function gets the download links for a youtube clip. This function parses the get_video_info format of youtube. @param video_id: Youtube Video ID. @return d: A dictonary of qualities as keys and urls as values. ''' d = {} url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id urlObj = urllib2.urlopen(url, timeout=12) data = urlObj.read() data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data))) data = data.replace(',url', '\nurl') data = data.split('\n') for line in data: if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line: continue try: url = line.split('&quality=')[0].split('url=')[1] quality = line.split('&quality=')[1].split('&')[0] except: continue if quality in d: d[quality].append(url) else: d[quality] = [url] try: videoName = "|".join(data).split('&title=')[1].split('&')[0] except Exception, e: log.error("Could not parse VideoName out of get_video_info (%s)" % str(e)) videoName = "" videoName = unicode(videoName, 'utf-8') d['VideoName'] = videoName.replace('+',' ').replace('--','-') return d class NextList(object): "A list with a 'next' method." def __init__(self, l): self.l = l self.next_index = 0 def next(self): if self.next_index < len(self.l): value = self.l[self.next_index] self.next_index += 1 return value else: return None def isEOF(self): " Checks if the list has reached the end " return (self.next_index >= len(self.l)) class MetaUrl(object): "a url strecture data with many metadata" def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""): self.url = str(url) self.source = source self.videoName = videoName # Youtube Links Only self.quality = quality # Youtube Links Onlys self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys def __repr__(self): return "<MetaUrl '%s' | %s>" % (self.url, self.source) def search(song, n, processes=config.search_processes): ''' Function searches song and returns n valid .mp3 links. @param song: Search string. @param n: Number of songs. @param processes: Number of processes to launch in the subprocessing pool. ''' linksFromSources = [] pool = multiprocessing.Pool(processes) args = [(song, source) for source in config.search_sources] imapObj = pool.imap_unordered(_parse_star, args) for i in range(len(args)): linksFromSources.append(NextList(imapObj.next(15))) pool.terminate() links = [] next_source = 0 while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)): nextItem = linksFromSources[next_source].next() if nextItem: log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source)) links.append(nextItem) if len(linksFromSources) == next_source+1: next_source = 0 else: next_source += 1 return links def _parse_star(args): return parse(*args) 
+6
source share
1 answer

I can not reproduce your problem on my machine. What is in your processes variable? Is this an int ?

 Python 2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> import multiprocessing.dummy as multiprocessing >>> pool = multiprocessing.Pool(5) >>> pool <multiprocessing.pool.ThreadPool object at 0x00C7DF90> >>> 

---- Editing ----

You probably also want to double check if you messed up the standard library, try a clean install of python 2.7.3 to another folder.

---- Edit 2 ----

You can quickly execute the patch like this:

 import multiprocessing.dummy import weakref import threading class Worker(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): poll = multiprocessing.dummy.Pool(5) print str(poll) w = Worker() w._children = weakref.WeakKeyDictionary() w.start() 
+6
source

Source: https://habr.com/ru/post/916274/


All Articles