Multiple IEnumerable Related Threads Using Crop

I use a third-party library that iterates over very large flat files, which can take many minutes. The library provides an Enumerator, so you can give each result and process it, while the counter then extracts the next element in a flat file.

eg:

IEnumerable<object> GetItems() { var cursor = new Cursor; try { cursor.Open(); while (!cursor.EOF) { yield return new //object; cursor.MoveNext(); } } finally { if (cursor.IsOpen) { cursor.Close(); } } } 

What I'm trying to achieve is to have two consumers of the same Enumerable, so I don’t need to retrieve the information twice so that each consumer can process each item as it arrives, without waiting for the time to arrive immediately.

 IEnumerable<object> items = GetItems(); new Thread(SaveToDateBase(items)).Start(); new Thread(SaveSomewhereElse(items)).Start(); 

I guess what I'm trying to achieve is something like

"if the item that the consumer is requesting is already extracted, then it returns it, otherwise move on and wait," but I am aware of the possible collisions of MoveNext () between the two threads.

Something like this is already coming out if you don’t think how it will be achieved?

thanks

+4
source share
2 answers

Essentially, you want to cache IEnumerable<T> data, but without waiting for it to complete before storing it. You can do something like this:

 public static IEnumerable<T> Cache<T>(this IEnumerable<T> source) { return new CacheEnumerator<T>(source); } private class CacheEnumerator<T> : IEnumerable<T> { private CacheEntry<T> cacheEntry; public CacheEnumerator(IEnumerable<T> sequence) { cacheEntry = new CacheEntry<T>(); cacheEntry.Sequence = sequence.GetEnumerator(); cacheEntry.CachedValues = new List<T>(); } public IEnumerator<T> GetEnumerator() { if (cacheEntry.FullyPopulated) { return cacheEntry.CachedValues.GetEnumerator(); } else { return iterateSequence<T>(cacheEntry).GetEnumerator(); } } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } } private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry) { for (int i = 0; entry.ensureItemAt(i); i++) { yield return entry.CachedValues[i]; } } private class CacheEntry<T> { public bool FullyPopulated { get; private set; } public IEnumerator<T> Sequence { get; set; } //storing it as object, but the underlying objects will be lists of various generic types. public List<T> CachedValues { get; set; } private static object key = new object(); /// <summary> /// Ensure that the cache has an item a the provided index. If not, take an item from the /// input sequence and move to the cache. /// /// The method is thread safe. /// </summary> /// <returns>True if the cache already had enough items or /// an item was moved to the cache, /// false if there were no more items in the sequence.</returns> public bool ensureItemAt(int index) { //if the cache already has the items we don't need to lock to know we //can get it if (index < CachedValues.Count) return true; //if we're done there no race conditions hwere either if (FullyPopulated) return false; lock (key) { //re-check the early-exit conditions in case they changed while we were //waiting on the lock. //we already have the cached item if (index < CachedValues.Count) return true; //we don't have the cached item and there are no uncached items if (FullyPopulated) return false; //we actually need to get the next item from the sequence. if (Sequence.MoveNext()) { CachedValues.Add(Sequence.Current); return true; } else { Sequence.Dispose(); FullyPopulated = true; return false; } } } } 

Usage example:

 private static IEnumerable<int> interestingIntGenertionMethod(int maxValue) { for (int i = 0; i < maxValue; i++) { Thread.Sleep(1000); Console.WriteLine("actually generating value: {0}", i); yield return i; } } public static void Main(string[] args) { IEnumerable<int> sequence = interestingIntGenertionMethod(10) .Cache(); int numThreads = 3; for (int i = 0; i < numThreads; i++) { int taskID = i; Task.Factory.StartNew(() => { foreach (int value in sequence) { Console.WriteLine("Task: {0} Value:{1}", taskID, value); } }); } Console.WriteLine("Press any key to exit..."); Console.ReadKey(true); } 
+2
source

Implementing pipeline patterns using .NET 4 BlockingCollection<T> and TPL Tasks is what you are looking for. See my answer with a complete example of fooobar.com/questions/437176 / ....

Example: 3 concurrent users

 BlockingCollection<string> queue = new BlockingCollection<string>(); public void Start() { var producerWorker = Task.Factory.StartNew(() => ProducerImpl()); var consumer1 = Task.Factory.StartNew(() => ConsumerImpl()); var consumer2 = Task.Factory.StartNew(() => ConsumerImpl()); var consumer3 = Task.Factory.StartNew(() => ConsumerImpl()); Task.WaitAll(producerWorker, consumer1, consumer2, consumer3); } private void ProducerImpl() { // 1. Read a raw data from a file // 2. Preprocess it // 3. Add item to a queue queue.Add(item); } // ConsumerImpl must be thrad safe // to allow launching multiple consumers simulteniously private void ConsumerImpl() { foreach (var item in queue.GetConsumingEnumerable()) { // TODO } } 

If something else is not clear, let me know.

High level piping flow diagram:

enter image description here

+5
source

Source: https://habr.com/ru/post/1442327/


All Articles