Is there an idiomatic way to route elements that do not work in a TransformBlock in a TPL data flow graph?

I use the TPL data stream to create a bufferBlock of input elements that are processed by a TransformBlock that outputs to the output bufferBlock

inputQueue = new BufferBlock<InputPacket>;
processQueue = new TransformBlock <InputPacket, OutputPacket>;
outputQueue = new BufferBlock<OutputPacket>;

inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true });
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true });

Is there an idiomatic way to route items that are not running?

Upon completion of the processing of elements, the InputPacketaction associated with processQueuewill return OutputPacket, which will be redirected tooutputQueue

If the action associated with processQueueinvokes an unreliable web service, then processing some elements InputPacketwill time out, and I would like to repeat these elements x times. But I do not want to try them right away, I want to return them to the input queue.

InputPacket, - inputQueue, x , failureQueue:

BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>;

LinkTo :

InputPacket OutputPacket

, , :

processQueue = new TransformBlock <InputPacket, ParentPacketType>;

.

inputElement InputPacket,

.

+4
1

-, , , , TPL Dataflow. , . , .

, TPL Dataflow , , , , . :

public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform, int retryCount,
    ITargetBlock<(TInput, Exception)> failureBlock)
{
    var failedInputs = new Dictionary<TInput, int>();

    TransformManyBlock<TInput, TOutput> resultBlock = null;

    resultBlock = new TransformManyBlock<TInput, TOutput>(
        async input =>
        {
            try
            {
                return new[] { transform(input) };
            }
            catch (Exception exception)
            {
                failedInputs.TryGetValue(input, out int count);

                if (count < retryCount)
                {
                    failedInputs[input] = count + 1;
                    // ignoring the returned Task, to avoid deadlock
                    _ = resultBlock.SendAsync(input);
                }
                else
                {
                    failedInputs.Remove(input);
                    await failureBlock.SendAsync((input, exception));
                }

                return Array.Empty<TOutput>();
            }
        });

    return resultBlock;
}

, :

  • # 7.0. , , , .
  • , . Dictionary , failureBlock.
  • . , .
  • parallelism. , (, , ConcurrentDictionary Dictionary).
  • (: GetHashCode ), . - .
+3

Source: https://habr.com/ru/post/1679578/


All Articles