Use the TPL data stream to encapsulate a pipeline ending in an action block

TPL Dataflow provides a very useful feature:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

so that you can encapsulate multiple blocks into one transform block. He returns

IPropagatorBlock<TInput, TOutput>

which represents the start and end blocks of your pipeline.

However, if the last block in my pipeline is an ActionBlock, I cannot use this since the ActionBlock is not a SourceBlock, and the type of the function returned would be ITargetBlock, not IPropagatorBlock.

Essentially, I'm looking for something like this function:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

Is this a reasonable thing to write, or am I missing something simple? I'm not quite sure how to write it, especially the completion wiring. Should I create my own custom block type?

EDIT:

, , @Panagiotis Kanavos, , . EncapsulatingPropagator, DataflowBlock.Encapsulate:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }
+4
2

Encapsulate , , , .

, , , , .

, , :

  • - (), . ActionBlock, , , () .
  • - (), . ActionBlock , .

Encapsulate, ActionBlock source, , .

, , , , , . , ( ), , :

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

, Input. , Completion.

, , ITargetBlock, . Completion .

:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

2

@bornfromanegg, , :

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}
+3

, ActionBlock s, , , , .

" " , . ( .)

public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
    private readonly ITargetBlock<TInput> startBlock;

    private readonly Task completion;

    public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
    {
        this.startBlock = startBlock;
        this.completion = completion;
    }

    public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
    {
        this.startBlock = startBlock;
        completion = endBlock.Completion;
    }

    public Task Completion => completion;

    public void Complete()
    {
        startBlock.Complete();
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        if (exception == null)
        {
            throw new ArgumentNullException("exception");
        }

        startBlock.Fault(exception);
    }

    public DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
}

:

public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
    var splitter = new BroadcastBlock<Client.InputRecord>(null);
    var getresults = new TransformManyBlock(...);    // propagator
    var saveinput = new ActionBlock(...);
    var saveresults = new ActionBlock(...);

    splitter.LinkTo(saveinput, PropagateCompletion);
    splitter.LinkTo(getresults, PropagateCompletion);
    getresults.LinkTo(saveresults, PropagateCompletion);

    return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}

EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions) WhenAll(...) , .

+2

Source: https://habr.com/ru/post/1606424/


All Articles