I have a simple program that reads a large file containing several million lines, parses each line ( numpy array ) and converts it into a doubling array ( python array ), and then writes to hdf5 file . I repeat this cycle for several days. After reading each file, I delete all the objects and call the garbage collector. When I run the program, the first day is parsed without any error, but on the second day I get a MemoryError . I controlled the memory usage in my program, on the first day of parsing, the memory usage is 1.5 GB . At the end of processing the first day, memory usage is reduced to 50 MB . Now, when the second day begins, and I try to read the lines from the file, I get a MemoryError . The following is the output of the program.
source file extracted at C:\rfadump\au\2012.08.07.txt parsing started current time: 2012-09-16 22:40:16.829000 500000 lines parsed 1000000 lines parsed 1500000 lines parsed 2000000 lines parsed 2500000 lines parsed 3000000 lines parsed 3500000 lines parsed 4000000 lines parsed 4500000 lines parsed 5000000 lines parsed parsing done. end time is 2012-09-16 23:34:19.931000 total time elapsed 0:54:03.102000 repacking file done > s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles() -> while single_date <= self.end_date: (Pdb) c *** 2012-08-08 *** source file extracted at C:\rfadump\au\2012.08.08.txt cought an exception while generating file for day 2012-08-08. Traceback (most recent call last): File "rfaDumpToHDF.py", line 175, in generateFile lines = self.rawfile.read().split('|\n') MemoryError
I am very sure that the Windows system task manager shows memory usage as 50 MB for this process. It seems like the garbage collector or memory manager for Python is not correctly calculating free memory. There should be a lot of free memory, but she thinks that is not enough.
Any idea?
EDIT
Adding my code here
I will put parts of my code. I'm new to python, please forgive my python coding style.
module 1
def generateFile(self, current_date): try: print "*** %s ***" % current_date.strftime("%Y-%m-%d") weekday=current_date.weekday() if weekday >= 5: print "skipping weekend" return self.taqdb = taqDB(self.index, self.offset) cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5")) outputFile = config.hdf5.filePath(self.index, date=current_date) print "cache file: ", cache_filename print "output file: ", outputFile tempdir = "C:\\rfadump\\"+self.region+"\\" input_filename = tempdir + filename print "source file extracted at %s " % input_filename
module 2 - taqdb - to store the analyzed data in an array
class taqDB: def __init__(self, index, offset): self.index = index self.tickcfg = config.hdf5.getTickConfig(index) self.offset = offset self.groups = {} def getGroup(self,ric): if (self.groups.has_key(ric) == False): self.groups[ric] = {} return self.groups[ric] def getOrderbookArray(self, ric, group): datasetname = orderBookName prodtype = self.tickcfg.getProdType(ric) if(prodtype == ProdType.INDEX): return orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype) if(group.has_key(datasetname) == False): group[datasetname] = array.array("d") orderbookArray = self.tickcfg.getOrderBookArray(prodtype) return orderbookArray else: orderbookArray = group[datasetname] if(len(orderbookArray) == 0): return self.tickcfg.getOrderBookArray(prodtype) lastOrderbook = orderbookArray[-orderbookArrayShape[1]:] return np.array([lastOrderbook]) def addToDataset(self, group, datasetname, timestamp, arr): if(group.has_key(datasetname) == False): group[datasetname] = array.array("d") arr[0,0]=timestamp a1 = group[datasetname] a1.extend(arr[0]) def addToOrderBook(self, group, timestamp, arr): self.addToDataset(self, group, orderBookName, timestamp, arr) def insert(self, data): ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000)) timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
Module 3 - Parser
class rfaTextToTAQ: """RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs.""" def __init__(self,tickconfig): self.tickconfig = tickconfig self.token = '' self.state = ReadState.SEQ_NUM self.fvstate = fvstate.FID self.quotes = np.array([]) # read from tickconfig self.trades = np.array([]) # read from tickconfig self.prodtype = ProdType.STOCK self.allquotes = {} self.alltrades = {} self.acvol = 0 self.levelsUpdated = [] self.quoteUpdate = False self.tradeUpdate = False self.depth = 0 def updateLevel(self, index): if(self.levelsUpdated.__contains__(index) == False): self.levelsUpdated.append(index) def updateQuote(self, fidindex, field): self.value = float(self.value) if(self.depth == 1): index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1)) self.quotes[index[0]][fidindex[1][0]] = self.value self.updateLevel(index[0]) else: self.quotes[fidindex] = self.value self.updateLevel(fidindex[0][0]) self.quoteUpdate = True def updateTrade(self, fidindex, field): #self.value = float(self.value) if(self.tickconfig.tradeUpdate(self.depth) == False): return newacvol = float(self.value) if(field == acvol): if(self.value > self.acvol): tradesize = newacvol - self.acvol self.acvol = newacvol self.trades[fidindex] = tradesize if(self.trades.__contains__(0) == False): self.tradeUpdate = True else: self.trades[fidindex] = self.value if(not (self.trades[0,1]==0 or self.trades[0,2]==0)): self.tradeUpdate = True def updateResult(self): field = '' valid, field = field_dict.FIDToField(int(self.fid), field) if(valid == False): return if(self.value == '0'): return if(self.prodtype == ProdType.STOCK): fidindex = np.where(self.tickconfig.stkQuotes == field) if(len(fidindex[0]) == 0): fidindex = np.where(self.tickconfig.stkTrades == field) if(len(fidindex[0]) == 0): return else: self.updateTrade(fidindex, field) else: self.updateQuote(fidindex, field) else: fidindex = np.where(self.tickconfig.futQuotes == field) if(len(fidindex[0]) == 0): fidindex = np.where(self.tickconfig.futTrades == field) if(len(fidindex[0]) == 0): return else: self.updateTrade(fidindex, field) else: self.updateQuote(fidindex, field) def getOrderBookTrade(self): if (self.allquotes.has_key(self.ric) == False): acvol = 0 self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype) trades = self.tickconfig.getTradesArray() self.alltrades[self.ric] = [trades, acvol] return self.allquotes[self.ric], self.alltrades[self.ric] def parseline(self, line): self.tradeUpdate = False self.levelsUpdated = [] pos = 0 length = len(line) self.state = ReadState.SEQ_NUM self.fvstate = fvstate.FID self.token = '' ch = '' while(pos < length): prevChar = ch ch = line[pos] pos += 1 #SEQ_NUM if(self.state == ReadState.SEQ_NUM): if(ch != ','): self.token += ch else: self.seq_num = int(self.token) self.state = ReadState.TIMESTAMP self.token = '' # TIMESTAMP elif(self.state == ReadState.TIMESTAMP): if(ch == ' '): self.token = '' elif(ch != ','): self.token += ch else: if(len(self.token) != 12): print "Invalid timestamp format. %s. skipping line.\n", self.token self.state = ReadState.SKIPLINE else: self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f') self.state = ReadState.RIC self.token = '' # RIC elif(self.state == ReadState.RIC): if(ch != ','): self.token += ch else: self.ric = self.token self.token = '' self.ric, self.depth = self.tickconfig.replaceRic(self.ric) self.prodtype = self.tickconfig.getProdType(self.ric) if(self.tickconfig.subscribed(self.ric)): self.state = ReadState.UPDATE_TYPE self.quotes, trades = self.getOrderBookTrade() self.trades = trades[0] self.acvol = trades[1] else: self.state = ReadState.SKIPLINE # UPDATE_TYPE elif(self.state == ReadState.UPDATE_TYPE): if(ch != '|'): self.token += ch else: self.update_type = self.token self.token = '' self.state = ReadState.FVPAIRS #SKIPLINE elif(self.state == ReadState.SKIPLINE): return None # FV PAIRS elif(self.state == ReadState.FVPAIRS): # FID if(self.fvstate == fvstate.FID): if(ch != ','): if(ch.isdigit() == False): self.token = self.value+ch self.fvstate = fvstate.FIDVALUE self.state = ReadState.FVPAIRS else: self.token += ch else: self.fid = self.token self.token = '' self.fvstate = fvstate.FIDVALUE self.state = ReadState.FVPAIRS # FIDVALUE elif(self.fvstate == fvstate.FIDVALUE): if(ch != '|'): self.token += ch else: self.value = self.token self.token = '' self.state = ReadState.FVPAIRS self.fvstate = fvstate.FID # TODO set value self.updateResult() return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate
Thanks.