Multiple threads writing the same CSV in Python

I am new to Python multithreading and am currently writing a script that joins a csv file. If I had several streams sent to concurrent.futures.ThreadPoolExecutor, which adds lines to the csv file. What can I do to guarantee thread safety if adding was the only file-related operation performed by these threads?

A simplified version of my code:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

And my stream class:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

Should I set up a separate single-threaded artist to handle the recording, or worry if I just add?

EDIT: , , , , , , script, .

+5
3

, csvwriter . , , , , threading.Lock:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

downloadThread , .

+11

- , unicode, :

def ensure_bytes(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''

def __init__(self, *args, **kwargs):
    self._writer = csv.writer(*args, **kwargs)
    self._lock = threading.Lock()

def _encode(self, row):
    return [ensure_bytes(cell) for cell in row]

def writerow(self, row):
    row = self._encode(row)
    with self._lock:
        return self._writer.writerow(row)

def writerows(self, rows):
    rows = (self._encode(row) for row in rows)
    with self._lock:
        return self._writer.writerows(rows)

# example:
with open('some.csv', 'w') as f:
    writer = ThreadSafeWriter(f)
    writer.write([u'中文', 'bar'])

+2

A note about late party access: You could have handled this in a different way without blocking if one writer were working from a shared queue and the lines were sent to the queue by threads that process it.

from threading import Thread
from queue import Queue
from random import randint
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()

            # Row comes out of queue; CSV writing goes here

            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()
+1
source

Source: https://habr.com/ru/post/1611464/


All Articles