Dart: how to control concurrency in an asynchronous function

I really like the async / await template in Dart. This allows me to write readable methods.

But there are a few things that are problematic, in particular, I don’t know how to manage at all.

The problem is that with asynchronous and multiple waiting inside the method, we introduce concurrency in the method. For example, if I have a method:

Future<int> foo(int value) async { await foo2(); await foo3(); await foo4(); int ret = foo5(value); return ret; } 

Well, this is a really simple example. The problem here is that for each wait, the method is placed in an event loop. This is normal when you understand this, but that does not stop your application from invoking the method again before it reconfigures the value.

Consider whether a method manages data that is common to an instance of a class, and not to the method itself.

So, I tried the following solution:

 bool isWorking = false; Future<int> foo(int value) async { if (isWorking) return foo(value); isWorking = true; await foo2(); await foo3(); await foo4(); int ret = foo5(value); isWorking = False; return ret; } 

As I understand it, a call to a future method immediately put it in an event loop, so I thought that doing a parallel call to the method was delayed until its first completion. But this is not so, the program enters an endless cycle.

Can anyone give me an explanation and solution to this issue?

Edit: In general, I think it would be interesting to have, like in other languages, a synchronized keyword with the meaning that the method, if called a second time, will wait until the first one ends. Sort of:

 Future<int> foo(int value) async synchronized { 

Edit 2:

I am very excited because I think that I have a solution to this problem that I have had for a long time. Thanks to Argenti and, in particular, to Alexander, who give me the solution. I just reorganized the solution for easy reuse (at least for me) and I post it here in the class I created and an example on how to use it for those who need it (try at your own risk and risk ;-)). I used mixin because I find it practical, but you can use the Locker class if you want.

 myClass extends Object with LockManager { Locker locker = LockManager.getLocker(); Future<int> foo(int value) async { _recall() { return foo(value); } if (locker.locked) { return await locker.waitLock(); } locker.setFunction(_recall); locker.lock(); await foo2(); await foo3(); await foo4(); int ret = foo5(value); locker.unlock(); return ret; } } 

Grade:

 import 'dart:async'; class LockManager { static Locker getLocker() => new Locker(); } class Locker { Future<Null> _isWorking = null; Completer<Null> completer; Function _function; bool get locked => _isWorking != null; lock() { completer = new Completer(); _isWorking = completer.future; } unlock() { completer.complete(); _isWorking = null; } waitLock() async { await _isWorking; return _function(); } setFunction(Function fun) { if (_function == null) _function = fun; } } 

I structured the code so that you can easily use it in several methods inside your classes. In this case, you need a Locker instance for each method. I hope this can be helpful.

+5
source share
4 answers

Instead of boolean, you can use Future and Completer to achieve the desired result:

 Future<Null> isWorking = null; Future<int> foo(int value) async { if (isWorking != null) { await isWorking; // wait for future complete return foo(value); } // lock var completer = new Completer<Null>(); isWorking = completer.future; await foo2(); await foo3(); await foo4(); int ret = foo5(value); // unlock completer.complete(); isWorking = null; return ret; } 

The first time the isWorking method is isWorking , it is null ; it is not included in the if section and creates isWorking as a Future, which will be completed at the end of the method. If another call is completed before foo , before the first call completes Future isWorking , that call will go into the if section, and it will wait for the isWorking function to isWorking . This is the same for all calls that can be completed before the first call is completed. After the first call is completed (and isWorking set to null ), waiting calls will be notified that they are calling foo again. One of them will enter foo as the first call, and the same workflow will be executed.

See https://dartpad.dartlang.org/dceafcb4e6349acf770b67c0e816e9a7 for a better view of the workflow.

+2
source

The answers are great, here is just another mutex implementation that prevents the rotation of asynchronous operations.

 class AsyncMutex { Future _next = new Future.value(null); /// Request [operation] to be run exclusively. /// /// Waits for all previously requested operations to complete, /// then runs the operation and completes the returned future with the /// result. Future<T> run<T>(Future<T> operation()) { var completer = new Completer<T>(); _next.whenComplete(() { completer.complete(new Future<T>.sync(operation)); }); return _next = completer.future; } } 

It does not have many functions, but they are short and, I hope, understandable.

+3
source

I assume that you really need a Dart library that implements concurrency control primitives such as locks, mutexes, and semaphores.

I recently used the Pool package to efficiently implement a mutex to prevent "parallel" access to a resource. (It was a throw away code, so please do not take it as a high quality solution.)

Simplifying the example a bit:

 final Pool pool = new Pool(1, timeout: new Duration(...)); Future<Null> foo(thing, ...) async { PoolResource rp = await pool.request(); await foo1(); await foo2(); ... rp.release(); } 

Requesting a resource from the pool before calling asynchronous functions inside foo () ensures that when several simultaneous calls are too foo (), calls foo1 () and foo2 () will not "alternate" incorrectly.

Edit:

It seems that there are several packages that address the provision of mutexes: https://www.google.com/search?q=dart+pub+mutex .

+2
source

Async and concurrency are two separate topics. There is nothing parallel in the code above; all this is done sequentially. The code you have continues to insert more foo(value) events, and isWorking - true - it will never be completed.

The tool you are looking for is Completer<T> . The specific piece of code in question is difficult to distinguish, so I will give you another example. Say you have code that uses a database connection. Opening a database connection is an asynchronous operation. When one method requests a connection to the database, it waits for completion to open. During this wait, another method requests a connection to the database. The desired result is that only one connection to the database is opened and one connection to the database is returned to both subscribers:

 Connection connection; Completer<Connection> connectionCompleter; bool get isConnecting => connectionCompleter != null; Future<Connection> getDatabaseConnection() { if (isConnecting) { return connectionCompleter.future; } connectionCompleter = new Completer<Connection(); connect().then((c) { connection = c; connectionCompleter.complete(c); connectionCompleter = null; }); return connectionCompleter.future; } 

The first time the method calls getDatabaseConnection , the connect function is called, and the subsequent future is returned to the caller. Assuming getDatabaseConnection is called again before connect completes, the same full future is returned, but connect not called again.

When connect completes, the termination event is added to the event queue. This causes the two futures returned from getDatabaseConnection to terminate - in order - and the connection will be valid.

Note that getDatabaseConnection not even an asynchronous method, it just returns Future . It may be asynchronous, but it just distracts the processor cycles and makes your call stack ugly.

0
source

Source: https://habr.com/ru/post/1263882/


All Articles