How to handle events in batches with an elixir stream

I have csv_file in which a.) Firstly, each line must be converted to xml and b.) Secondly, the converted xml will be sent to the rails side for some database write operation.

Below is my Flow code for it.

flow = csv_rows
 |> Flow.from_enumerable()
 |> Flow.partition
 |> Flow.map(&(CSV.generate_xml(&1)))
 |> Flow.map(&(CSV.save_to_rails_databse(&1)))
 |> Flow.run

Everyting works fine for a small csv file, but when csv_file is very large (suppose 20,000) records, then the second operation (i.e. writing to the database on the rails side) tries to insert two multichannel files into the same one since the elixir sends too many requests to the rail side at the same time, so the database reaches its maximum limit.

Is it good to handle events in party 50, and in this case will be useful min_demandand max_demand.

+4
source share
1 answer

You can use Flow.map_state/2to get all the state for a certain state (in your case, since you are doing a match, the state will be events in this batch).

Here you will need three parameters: all of this is specified from_enumerable:

  • min_demand: this will be the effective batch size
  • max_demand: the maximum number of rows that will be in the stream between stages
  • Stages: number of simultaneous data processing steps. In your case, how many batches are processed at the same time

A few other considerations:

Flow, Flow Elixir. Task.async_stream/3, , , , :

batch_size = 100

# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]

csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch -> 
  batch
  |> Enum.map(&CSV.generate_xml/1)
  |> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()

, . , Flow, .

+9

Source: https://habr.com/ru/post/1691978/


All Articles