I have a stream of tokens that are produced very quickly, and a process that is relatively slow. Tokens consist of three subtypes, and I would prefer that they be processed according to their priority. Thus, I would like tokens to be buffered after they have been created and are waiting for processing, and this buffer is sorted by priority.
Here are my classes:
public enum Priority
High = 3,
Medium = 2,
Low = 1
public class Base : IComparable<Base>
public int Id { get; set; }
public int CompareTo(Base other)
return Id.CompareTo(other.Id);
public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }
public class Token : IComparable<Token>
private readonly string _toString;
public Foo Foo { get; }
public Bar Bar { get; }
public Baz Baz { get; }
public Priority Priority =>
Baz == null
? Bar == null
? Priority.High
: Priority.Medium
: Priority.Low;
public int CompareTo(Token other)
if (Priority > other.Priority)
return -1;
if (Priority < other.Priority)
return 1;
switch (Priority)
case Priority.High:
return Foo.CompareTo(other.Foo);
case Priority.Medium:
return Bar.CompareTo(other.Bar);
case Priority.Low:
return Baz.CompareTo(other.Baz);
throw new ArgumentOutOfRangeException();
public override string ToString()
return _toString;
public Token(Foo foo)
_toString = $"{nameof(Foo)}:{foo.Id}";
Foo = foo;
public Token(Foo foo, Bar bar) : this(foo)
_toString += $":{nameof(Bar)}:{bar.Id}";
Bar = bar;
public Token(Foo foo, Baz baz) : this(foo)
_toString += $":{nameof(Baz)}:{baz.Id}";
Baz = baz;
And here is my manufacturer code:
var random = new Random();
var bazId = 0;
var barId = 0;
var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
select new Token(new Foo { Id = id }))
var barTokens = (from fooToken in fooTokens
from id in Observable.Range(0, random.Next(5, 10))
.Select(_ => Interlocked.Increment(ref barId))
select new Token(fooToken.Foo, new Bar { Id = id }))
var bazTokens = (from barToken in barTokens
from id in Observable.Range(0, random.Next(1, 5))
.Select(_ => Interlocked.Increment(ref bazId))
select new Token(barToken.Foo, new Baz { Id = id }))
var tokens = bazTokens.Merge(barTokens)
.Do(dt =>
Console.ForegroundColor = ConsoleColor.Red;
However, I am a little fixated on how to buffer and sort tokens. If I do this, the tokens seem to be produced and consumed at the same time, which suggests that buffering is going on behind the scenes, but I can’t control it.
tokens.Subscribe(dt =>
Console.ForegroundColor = ConsoleColor.Green;
TPL ActionBlock
, , , , .
var proc = new ActionBlock<Token>(dt =>
Console.ForegroundColor = ConsoleColor.Green;
tokens.Subscribe(dt => proc.Post(dt));
, , !
. :
private static void Display(Token dt, ConsoleColor col, int? wait = null)
if (wait.HasValue)
Console.ForegroundColor = col;
var set = new SortedSet<Token>();
var tokens = bazTokens
.Do(dt => Display(dt, ConsoleColor.Red));
tokens.Subscribe(dt => set.Add(dt));
, :
var source = new CancellationTokenSource();
Task.Run(() =>
while (!source.IsCancellationRequested)
var dt = set.FirstOrDefault();
if (dt == null)
if (set.Remove(dt))
Display(dt, ConsoleColor.Green, 250);
}, source.Token);
, , , ) while
b) , . , , - !