Notify task when other tasks are performed

.Net TPL experts,

Note . Cannot use DataFlow library. Additions are not allowed.

I have four tasks, as shown in the diagram below:

enter image description here

  • task_1 (data_producer) โ†’ reads records from a large file (> 500,000 records) and adds records to BlockingCollection

  • task_2, task_3 (data_consumers) -> Each of these tasks takes records from a BlockingCollection. Each task does some work on the record taken from the BlockingCollection (associated with the network), and when it is completed, each task can add the record to the result queue. The processing order is NOT IMPORTANT.

  • task_4 (result processor) โ†’ Accepts entries from output_ results and writes to the output file.

Then I wait for the completion of tasks, i.e.:

Task.WhenAll( t1, t2, t3, t4 )

, , .

:

4, 2 3 , 4 , ?

, "" "" , , ; , 4, 2 3 , , .

, "" 2 3 4 - 2 3 , 4 ( ).

.

+4
3

, Thomas .

BlockingCollection, GetConsumingEnumerable() foreach. "". , , , 2 3, , , .

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

Task RunProcess()
{
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
}

public void Task1Start()
{
    Task.Run(()=>
    {
        foreach(var item in GetFileSource())
        {
            var processedItem = Process(item);
            _stageOneBlockingCollection.Add(processedItem);
        }
        _stageOneBlockingCollection.CompleteAdding();
    }
}

public Task Stage2Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage2(item);
            _stageTwoBlockingCollection.Add(processedItem);
        }
    }
}

void Stage2MonitorStart(params Task[] tasks)
{
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}

public Task Stage4Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage4(item);
            WriteToOutputFile(processedItem);
        }
    }
}
0

BlockingCollection results_queue, BlockingCollection.IsCompleted BlockingCollection.IsAddingCompleted. :

  • task1 BlockingCollection.CompleteAdding() , .
  • task2 task3 IsCompleted . , CompleteAdding(). , 2 3 , CompleteAdding() .
  • task4 result_queue , IsAddingCompleted , . task4 , IsCompleted .

Edit: , IsCompleted IsAddingCompleted. . , , BlockingCollection. , , !

    BlockingCollection<int> inputQueue;
    BlockingCollection<int> resultQueue;

    public void StartTasks()
    {
        inputQueue = new BlockingCollection<int>();
        resultQueue = new BlockingCollection<int>();

        Task task1 = Task.Run(() => Task1());
        Task task2 = Task.Run(() => Task2_3());
        Task task3 = Task.Run(() => Task2_3());
        Task[] tasksInTheMiddle = new Task[] { task2, task3 };
        Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding()));
        Task task4 = Task.Run(() => Task4());

        //Waiting for tasks to finish
    }
    private void Task1()
    {
        while(true)
        {
            int? input = ReadFromInputFile();
            if (input != null)
            {
                inputQueue.Add((int)input);
            }
            else
            {
                inputQueue.CompleteAdding();
                break;
            }
        }
    }

    private void Task2_3()
    {
        while(inputQueue.IsCompleted)
        {
            int input = inputQueue.Take();
            resultQueue.Add(input);
        }
    }

    private void Task4()
    {
        while(resultQueue.IsCompleted)
        {
            int result = resultQueue.Take();
            WriteToOutputFile(result);
        }
    }
0

, , TPL Dataflow library, TPL ( nuget.NET 4.5 ), ( BroadcastBlock):

var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ });

, , . ( Post , , ) , :

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var s in IncomingData)
{
    await buffer.SendAsync(s);
}
buffer.Complete();

all, BroadcastBlock ( ), - ( , ), , "" , - " t .

, , .Completion resultsProcessor:

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ });
0

Source: https://habr.com/ru/post/1661628/


All Articles