C # multithreaded concurrency mechanism for waiting on a given result

I need a mechanism to implement the following scenario:

  • two or more threads must simultaneously load a given set of values
  • only one query should be executed for each value, so if two threads need to load the same subsets, you need to wait for another
  • I do not want to have a lock (or mutex or other primitive) for each value, as they can be potentially too high.

There could be a scenario (suppose stream B comes in a bit earlier)

          thread A           thread B
values    5, 8, 9, 12        7, 8, 9, 13, 14
request   5,       12        7, 8, 9, 13, 14
waits for    8, 9
                             >>data loaded<<
retrieves    8, 9

          >> data loaded <<
returns   5, 8, 9, 12

What parallel mechanism should I use for this?

Remember that the producer / consumer will not work, since flows A and B are not exact consumers (they are only interested in certain data).

thank

+3
1

, ?

class LockManager<TKey>
{
    private Dictionary<TKey, List<EventWaitHandle>> locks =
        new Dictionary<TKey, List<EventWaitHandle>>();
    private Object syncRoot = new Object();

    public void Lock(TKey key)
    {
        do
        {
            Monitor.Enter(syncRoot);
            List<EventWaitHandle> waiters = null;
            if (true == locks.TryGetValue(key, out waiters))
            {
                // Key is locked, add ourself to waiting list
                // Not that this code is not safe under OOM conditions
                AutoResetEvent eventLockFree = new AutoResetEvent(false);
                waiters.Add(eventLockFree);
                Monitor.Exit(syncRoot);
                // Now wait for a notification
                eventLockFree.WaitOne();
            }
            else
            {
                // event is free
                waiters = new List<EventWaitHandle>();
                locks.Add(key, waiters);
                Monitor.Exit(syncRoot);
                // we're done
                break;
            }
        } while (true);

    }

    public void Release(TKey key)
    {
        List<EventWaitHandle> waiters = null;
        lock (syncRoot)
        {
            if (false == locks.TryGetValue(key, out waiters))
            {
                Debug.Assert(false, "Releasing a bad lock!");
            }
            locks.Remove(key);
        }
        // Notify ALL waiters. Unfair notifications
        // are better than FIFO for reasons of lock convoys
        foreach (EventWaitHandle waitHandle in waiters)
        {
            waitHandle.Set();
        }
    }
}

:

class Program
{
    class ThreadData
    {
        public LockManager<int> LockManager { get; set; }
        public int[] Work { get; set; }
        public AutoResetEvent Done { get; set; }
    }

    static void Main(string[] args)
    {
        int[] forA = new int[] {5, 8, 9, 12};
        int[] forB = new int[] {7, 8, 9, 13, 14 };

        LockManager<int> lockManager = new LockManager<int>();

        ThreadData tdA = new ThreadData
        {
            LockManager = lockManager,
            Work = forA,
            Done = new AutoResetEvent(false),
        };
        ThreadData tdB = new ThreadData
        {
            LockManager = lockManager,
            Work = forB,
            Done = new AutoResetEvent(false),
        };

        ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdA);
        ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdB);

        WaitHandle.WaitAll(new WaitHandle[] { tdA.Done, tdB.Done });
    }

    static void Worker(object args)
    {
        Debug.Assert(args is ThreadData);
        ThreadData data = (ThreadData) args;
        try
        {
            foreach (int key in data.Work)
            {
                data.LockManager.Lock(key);
                Console.WriteLine("~{0}: {1}",
                    Thread.CurrentThread.ManagedThreadId, key);
                // simulate the load the set for Key
                Thread.Sleep(1000);
            }
            foreach (int key in data.Work)
            {
                // Now free they locked keys
                data.LockManager.Release(key);
            }
        }
        catch (Exception e)
        {
            Debug.Write(e);
        }
        finally
        {
            data.Done.Set();
        }
    }
}

, , - . {5,8,9,7} {7,8,9,5}, .

+1

Source: https://habr.com/ru/post/1731080/


All Articles