The following code does not work at the same time, and I do not know exactly why:
def run_normalizers(config, debug, num_threads, name=None): def _run(): print('Started process for normalizer') sqla_engine = init_sqla_from_config(config) image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET) storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET) pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug) if name: pp.run_pipeline_normalizers(name) else: pp.run_all_normalizers() print('Normalizer process complete') threads = [] for i in range(num_threads): threads.append(multiprocessing.Process(target=_run)) [t.start() for t in threads] [t.join() for t in threads] run_normalizers(...)
The config variable is just a dictionary defined outside of the _run() function. It seems that all the processes are created, but it is not as fast as if I were doing it with a single process. Basically, what happens in the run_**_normalizers() functions is read from the queue table in the database (SQLAlchemy), then it processes several HTTP requests, and then starts the pipeline of normalizers to modify the data, and then saves them back to a database. I come from JVM-land, where threads are "heavy" and often used for parallelism - I'm a little confused by this, as I thought the multiprocess module should have bypassed the Python GIL limitations.
source share