It is not fast enough to dump multithreaded access to ConcurrentBag to a file

I built this code to handle string comparisons between a large number of lines in parallel to go faster.

I used ConcurrentBag so that all threads (tasks) could write to a thread safe collection. Then I upload this collection to a file.

The problem is that the ConcurrentBag<string> log , which I dump into the file, fills up faster than it can write to the file. Thus, my program consumes more and more reels until the memory runs out.

My question is what can I do? Improve logging? Suspend tasks until ConcurrentBag is reset, and then resume tasks? What would be the fastest option?

Here is the code:

 CsvWriter csv = new CsvWriter(@"C:\test.csv"); List<Bailleur> bailleurs = DataLoader.LoadBailleurs(); ConcurrentBag<string> log = new ConcurrentBag<string>(); int i = 0; var taskWriteToLog = new Task(() => { // Consume the items in the bag string item; while (true) // (!log.IsEmpty) { if (!log.IsEmpty) { if (log.TryTake(out item)) { csv.WriteLine(item); } else Console.WriteLine("Concurrent Bag busy"); } else { System.Threading.Thread.Sleep(1000); } } }); taskWriteToLog.Start(); Parallel.ForEach(bailleurs, s1 => { foreach (Bailleur s2 in bailleurs) { var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name); string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2); log.Add(line); // Console.WriteLine(line); var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name); line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic); log.Add(line); // Console.WriteLine(line); } i++; Console.WriteLine(i); }); public class CsvWriter { public string FilePath { get; set; } private FileStream _fs { get; set; } private StreamWriter _sw { get; set; } public CsvWriter2(string filePath) { FilePath = filePath; _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write); _sw = new StreamWriter(_fs); } public void WriteLine(string line) { _sw.WriteLine(line); } } 
+4
source share
3 answers

Do not use the parallel bag directly, use the BlockingCollection , which has a parallel package as the backup storage (by default, it is parallel to the queue).

One of the constructor overloads allows you to set an upper limit on the size of the collection; if the bag is full, it will block the insert until there is room for insertion.

It also gives you GetConsumingEnumerable () , which makes it very easy to take items from a bag, you just use it in the foreach and it will continue to give your consumer data until CompleteAdding . After that, it works until the bag is empty, and then exits, like all other normal IEnumerable that have been completed. If the bag β€œdries” before CompleteAdding is called, it will block the flow and restart automatically when more data gets into the bag.

 void ProcessLog() { CsvWriter csv = new CsvWriter(@"C:\test.csv"); List<Bailleur> bailleurs = DataLoader.LoadBailleurs(); const int MAX_BAG_SIZE = 500; BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE); int i = 0; var taskWriteToLog = new Task(() => { // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks. foreach(string item in log.GetConsumingEnumerable()) { csv.WriteLine(item); } }); taskWriteToLog.Start(); Parallel.ForEach(bailleurs, s1 => { //Snip... You can switch to BlockingCollection without any changes to this section of code. }); log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty. } 
+7
source

Use BlockingCollection instead of ConcurrentBag

 BlockingCollection<string> log = new BlockingCollection<string>(); var item = log.Take(); 

In this case, Take will be blocked until the element is inserted, and you will not need to check log.IsEmpty . There will also be no need for Thread.Sleep

 while (true) { var item = log.Take(); //Do something with item...... } 
+2
source

First, does it look like you are writing a file using strings as blocks?

If you can get all the data into an object and write it as large blocks, it will be faster. You are currently probably pushing the maximum IOPS of the device you are writing. Your lines will be tiny. So your recording template will look like 4k Random IO .. or worse.

Using a different collection will not change the fact that writing to disk is the slowest thing you do.

Looking at a parallel package this may not be possible, but if you can remove the lines from your bag and combine them into one large array of lines / bytes closer to 1-5 MB, you should increase your performance. (You may need to insert CR LF back into the string.)

0
source

Source: https://habr.com/ru/post/1497375/


All Articles