Is there any expectation for threads?

Hi, I would like to know if there is something similar to the await statement that is used with tasks that I can implement using threads in C #?

What I want to do:

Run Thread A, calculate some data and put the result on the variable x . After that, the variable x transferred to another thread B and at the same time, Thread A starts to calculate some data again, and thread B starts another calculation with the result x .

UPDATE: Well, there seems to be some confusion, so I will be more precise in my description:

I use two sensors that produce data. The data must be retrieved so that SensorA data is retrieved (which takes a lot of time), and immediately after that, the data from SensorB must be received in a different stream, while SensorA continues to retrieve another data block. The problem is that I cannot set the data of both sensors in one queue, but I need to save the data of both sensors in ONE data structure / object.

My idea was this:

  • Receive data from sensor A in stream A.
  • Give the result of Thread B and restart Thread A.
  • While Thread A starts again. Thread B receives data from sensor B and calculates data from sensors A and B

It can be assumed that Thread A always requires a longer time than Thread B

+5
source share
3 answers

As I said in the comment. This is similar to a classic producer / consumer, for which we can use, for example. a BlockingCollection .

This is a small sample modification from this page:

 BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100); // "Thread B" Task.Run(() => { while (!dataItems.IsCompleted) { Data dataA = null; try { dataA = dataItems.Take(); } catch (InvalidOperationException) { } if (dataA != null) { var dataB = ReadSensorB(); Process(dataA,dataB); } } Console.WriteLine("\r\nNo more items to take."); }); // "Thread A" Task.Run(() => { while (moreItemsToAdd) { Data dataA = ReadSensorA(); dataItems.Add(dataA); } // Let consumer know we are done. dataItems.CompleteAdding(); }); 

And then moreItemsToAdd is just the code you need to cope with the need to close this process.

+6
source

I'm not sure why you avoid using tasks? Maybe you are using an older version of .net? If so, BlockingCollection, as Damien suggested, is also not an option. If you use "regular" threads, you can use waithandle to pass results between threads. For example, AutoResetEvent .

 private int a; private AutoResetEvent newResult = new AutoResetEvent(false); private void ThreadA() { while (true) { a = GetSensorA(); newResult.Set(); } } private void ThreadB() { int b; while (true) { newResult.WaitOne(); b = GetSensorB(); // or before "waitone" Console.WriteLine(a + b); // do something } } 

edit: there was a small error there with reset, thanks for pointing Damien - updated

+3
source

If you can use .Net 4.5 or later, then the best way to approach this is to use the DataFlow component for TPL.

(You must use NuGet to set the DataFlow; this is not part of the CLR by default.)

Here is an example of a compiled console application demonstrating how to use DataFlow for this:

 using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace SensorDemo { public sealed class SensorAData { public int Data; } public sealed class SensorBData { public double Data; } public sealed class SensorData { public SensorAData SensorAData; public SensorBData SensorBData; public override string ToString() { return $"SensorAData = {SensorAData.Data}, SensorBData = {SensorBData.Data}"; } } class Program { static void Main() { var sensorADataSource = new TransformBlock<SensorAData, SensorData>( sensorAData => addSensorBData(sensorAData), dataflowOptions()); var combinedSensorProcessor = new ActionBlock<SensorData>( data => process(data), dataflowOptions()); sensorADataSource.LinkTo(combinedSensorProcessor, new DataflowLinkOptions { PropagateCompletion = true }); // Create a cancellation source that will cancel after a few seconds. var cancellationSource = new CancellationTokenSource(delay:TimeSpan.FromSeconds(20)); Task.Run(() => continuouslyReadFromSensorA(sensorADataSource, cancellationSource.Token)); Console.WriteLine("Started reading from SensorA"); sensorADataSource.Completion.Wait(); // Wait for reading from SensorA to complete. Console.WriteLine("Completed reading from SensorA."); combinedSensorProcessor.Completion.Wait(); Console.WriteLine("Completed processing of combined sensor data."); } static async Task continuouslyReadFromSensorA(TransformBlock<SensorAData, SensorData> queue, CancellationToken cancellation) { while (!cancellation.IsCancellationRequested) await queue.SendAsync(readSensorAData()); queue.Complete(); } static SensorData addSensorBData(SensorAData sensorAData) { return new SensorData { SensorAData = sensorAData, SensorBData = readSensorBData() }; } static SensorAData readSensorAData() { Console.WriteLine("Reading from Sensor A"); Thread.Sleep(1000); // Simulate reading sensor A data taking some time. int value = Interlocked.Increment(ref sensorValue); Console.WriteLine("Read Sensor A value = " + value); return new SensorAData {Data = value}; } static SensorBData readSensorBData() { Console.WriteLine("Reading from Sensor B"); Thread.Sleep(100); // Simulate reading sensor B data being much quicker. int value = Interlocked.Increment(ref sensorValue); Console.WriteLine("Read Sensor B value = " + value); return new SensorBData {Data = value}; } static void process(SensorData value) { Console.WriteLine("Processing sensor data: " + value); Thread.Sleep(1000); // Simulate slow processing of combined sensor values. } static ExecutionDataflowBlockOptions dataflowOptions() { return new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 }; } static int sensorValue; } } 
+3
source

Source: https://habr.com/ru/post/1240672/


All Articles