How can I use queue consumer multithreading when the order of some transactions makes sense?

I have a queue of tasks that work with a collection of objects (let's say that objects are entries in the address book, for example).

An example task would be "Update Joe's phone number to 888-555-1212."

It is possible to simultaneously perform several tasks "Update phone number ..." in the queue, but with different phone numbers. In this case, updates should be applied to ensure the correct state at the end (and no, for argumentation, it is impossible to set timestamps for tasks and timestamps in the address book entries and discard obsolete tasks).

It's safe to apply an update for Jane out of order with an update for Joe.

I would like multi-threaded queue processing, but I need to synchronize human access.

Is there a convenient library for this kind of thing? Or am I disconnecting from using the Contractor and performing my own synchronization by "name" in the Runnable run () method?

+6
source share
3 answers

A simple but not perfect solution to this problem is to maintain a set of auxiliary queues in the array equal to the number of processed threads that you use. One main thread pulls items from your main queue and adds them to an additional queue, indexed modulo the hash code of the object (the hash code of anyone identifying and associated with your tasks).

eg.

int queueIndex = myEntity.getKey().hashCode() % queues.length; 

Only one chain processes this queue, and all tasks for the same object will be sent to this queue, so race conditions will not be.

This solution is imperfect, as some threads may end in large queues than others. In practice, this hardly matters, but it is something that needs to be taken into account.

Problems with a simple solution:

A simpler solution for pulling items from one queue, and then blocking on something different for the affected object, has a race condition (as Aurand pointed out). Given:

 Master Queue [ Task1(entity1), Task2(entity1), ... ] 

Where task1 and task2 modify the same entity1 , and thread1 and thread2 work in the queue, then the expected / desired sequence of events:

  • Thread1 accepts task1
  • Lock Thread1 in essence1
  • Thread1 edits an entity1
  • Thread1 unlocks entity1
  • Thread2 accepts task2
  • Thread2 blocks entity1
  • Thread2 edits an entity1
  • Thread2 unlocks entity1

Unfortunately, even if the lock is the first instruction in the thread start method, the following sequence is possible:

  • Thread1 accepts task1
  • Thread2 accepts task2
  • Thread2 blocks entity1
  • Thread2 edits an entity1
  • Thread2 unlocks entity1
  • Thread1 blocks entity1
  • Thread1 edits an entity1
  • Thread1 unlocks entity1

To avoid this, each thread would have to block something (say, a queue) before executing a task from the queue, and then obtain an object lock, while maintaining the parent lock. However, you don’t want to lock everything by holding this parental lock and waiting for an object to be locked, so you only need to try to lock the object and then handle the case when it cannot receive (maybe put it in another queue), In general the situation is becoming non-trivial.

+3
source

Such conflicts are always resolved by assigning a version to each object. For each version of the update is increasing. Therefore, if one update occurs at the wrong time, it can be fired or postponed. In any case, you should have a way to decide which update is the first and second. This method is called optimistic blocking .

0
source

One possible solution

Suppose the task is described by some class

 class Task { Integer taskGroup; // other } 

where taskGroup is an identifier that identifies tasks that should be processes in order of arrival (in your example, each "Name" can define its own group of tasks - or, more generally, tasks with the same name refer to the same task group).

Let mainTaskQueue denote a List of Task objects. Then

  • Create a Map<Integer,List<Task>> , say taskGroupsQueues
  • For each task group, create a thread that runs on taskGroupsQueues.get(taskGroup) sequentially.
  • Main threads remove the task from your mainTaskQueue main task mainTaskQueue and attach it to taskGroups.get(task.taskGroup)
  • It is necessary to synchronize the movement of tasks from the main queue to single queues and samples from single queues.

In other words: tasks related to the same name are executed in the same thread.

Please note that if the main thread performs the distribution of tasks, it can also perform some load balancing, that is, if the task is not forcibly linked to a specific queue due to a sequence of orders, this task should go to a short queue. Howerver, it seems to you that it can become single-threaded, namely, when you have only tasks that depend on the same group of tasks (in the name of your case).

Another possible solution (not verified, just a suggestion)

As indicated in the message increment1s post and Aurands, synchronization in the task group (name) inside the tread has some problems. Basically: too late, because the executor could start two threads that try to synchronize one name. However, you can try to ensure execution order at the performer level. See For example, this post: Java Performers: how can I prioritize a task? (which refers to the PriorityBlockingQueue passed to the executor).

-1
source

Source: https://habr.com/ru/post/952145/


All Articles