I have a stream of tokens that are produced very quickly, and a process that is relatively slow. Tokens consist of three subtypes, and I would prefer that they be processed according to their priority. Thus, I would like tokens to be buffered after they have been created and are waiting for processing, and this buffer is sorted by priority.
Here are my classes:
public enum Priority
{
High = 3,
Medium = 2,
Low = 1
}
public class Base : IComparable<Base>
{
public int Id { get; set; }
public int CompareTo(Base other)
{
return Id.CompareTo(other.Id);
}
}
public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }
public class Token : IComparable<Token>
{
private readonly string _toString;
public Foo Foo { get; }
public Bar Bar { get; }
public Baz Baz { get; }
public Priority Priority =>
Baz == null
? Bar == null
? Priority.High
: Priority.Medium
: Priority.Low;
public int CompareTo(Token other)
{
if (Priority > other.Priority)
{
return -1;
}
if (Priority < other.Priority)
{
return 1;
}
switch (Priority)
{
case Priority.High:
return Foo.CompareTo(other.Foo);
case Priority.Medium:
return Bar.CompareTo(other.Bar);
case Priority.Low:
return Baz.CompareTo(other.Baz);
default:
throw new ArgumentOutOfRangeException();
}
}
public override string ToString()
{
return _toString;
}
public Token(Foo foo)
{
_toString = $"{nameof(Foo)}:{foo.Id}";
Foo = foo;
}
public Token(Foo foo, Bar bar) : this(foo)
{
_toString += $":{nameof(Bar)}:{bar.Id}";
Bar = bar;
}
public Token(Foo foo, Baz baz) : this(foo)
{
_toString += $":{nameof(Baz)}:{baz.Id}";
Baz = baz;
}
}
And here is my manufacturer code:
var random = new Random();
var bazId = 0;
var barId = 0;
var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
.Select(Convert.ToInt32)
.Take(3)
select new Token(new Foo { Id = id }))
.Publish();
var barTokens = (from fooToken in fooTokens
from id in Observable.Range(0, random.Next(5, 10))
.Select(_ => Interlocked.Increment(ref barId))
select new Token(fooToken.Foo, new Bar { Id = id }))
.Publish();
var bazTokens = (from barToken in barTokens
from id in Observable.Range(0, random.Next(1, 5))
.Select(_ => Interlocked.Increment(ref bazId))
select new Token(barToken.Foo, new Baz { Id = id }))
.Publish();
var tokens = bazTokens.Merge(barTokens)
.Merge(fooTokens)
.Do(dt =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();
However, I am a little fixated on how to buffer and sort tokens. If I do this, the tokens seem to be produced and consumed at the same time, which suggests that buffering is going on behind the scenes, but I can’t control it.
tokens.Subscribe(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
TPL ActionBlock
, , , , .
var proc = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
tokens.Subscribe(dt => proc.Post(dt));
, , !
Update:
. :
private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
if (wait.HasValue)
{
Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
}
Console.ForegroundColor = col;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}
SortedSet
:
var set = new SortedSet<Token>();
var tokens = bazTokens
.Merge(barTokens)
.Merge(fooTokens)
.Do(dt => Display(dt, ConsoleColor.Red));
tokens.Subscribe(dt => set.Add(dt));
, :
var source = new CancellationTokenSource();
Task.Run(() =>
{
while (!source.IsCancellationRequested)
{
var dt = set.FirstOrDefault();
if (dt == null)
{
continue;
}
if (set.Remove(dt))
{
Display(dt, ConsoleColor.Green, 250);
}
}
}, source.Token);
, , , ) while
b) , . , , - !