Multithreading with a large number of file I / O tasks

I'm not completely new to C #, but I'm not familiar with the language enough to know how to do what I need.

I have a file, name it File1.txt. The1.txt file has 100,000 lines or so. I will duplicate File1.txt and call it File1_untested.txt. I will also create an empty file "Successes.txt" For each line in the file:

  • Delete this line from File1_untested.txt
  • If this line passes the test, write it in Successes.txt

So my question is, how can I multithread this?

So far, my approach has been to create an object (LineChecker), provide the object with its string for verification, and pass the object to ThreadPool. I understand how to use ThreadPools for several tasks using CountdownEvent. However, it seems unreasonable to queue 100,000 tasks at a time. How can I gradually feed the pool? Maybe 1000 lines at a time or something like that.

In addition, I need to make sure that neither of the two streams adds to or removes from Successes.txt from File1_untested.txt at the same time. I can handle this with lock (), right? What should I go to lock ()? Can I use a static member of LineChecker?

I'm just trying to get a wide idea of ​​how something like this can be created.

+4
2

, . , / . , / .

, :

, :

private bool Test(string line)
{
    //This test is expensive
}

, :

10, , , , , .

, , 10 , , . , .

CancellationTokenSource cancellation_token_source = new CancellationTokenSource();

CancellationToken cancellation_token = cancellation_token_source.Token;

BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10);

using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read)))
{
    using (
        StreamWriter writer =
            new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write)))
    {

        var input_task = Task.Factory.StartNew(() =>
        {
            try
            {
                while (!reader.EndOfStream)
                {
                    if (cancellation_token.IsCancellationRequested)
                        return;

                    blocking_collection.Add(reader.ReadLine());
                }
            }
            finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding
            {
                blocking_collection.CompleteAdding();
            }
        });


        try
        {
            Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) =>
            {
                bool test_reault = Test(line);


                if (test_reault)
                {
                    lock (writer)
                    {
                        writer.WriteLine(line);
                    }
                }
            });
        }
        catch
        {
            cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop
            throw;
        }

        input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here
    }
}
+2

"" , - , 100% , , , : .

"" -, , , . , , , , - , -. , , .

, . 80 , , 8 , , , , .

, , MyLine, , , .

, BlockingCollection, @Paul.

BlockingCollection . , ( ), . , 10 , :

var sourceCollection = new BlockingCollection<MyLine>(10);

sourceCollection MyLine, 10 , MyLine . , , resultCollection sourceCollection.

async #, , , .

resultCollection List, . ( List, , , resultCollection, , , List. , .)

+2

Source: https://habr.com/ru/post/1609133/


All Articles