Dispose of the child stream according to the identifier

I generate a child stream when I receive data from the user.

What are the steps if I want to get rid of the previous user child thread, if the same user sends data again and wants to create a new user child thread again?

+5
source share
3 answers

That's right, so java cannot get rid of the thread, the thread just starts until it terminates. So: To get rid of a thread, you need to enable the method to start threads, and then get rid of all references to Thread and any Runnable with which it is designed.

You want to include the finishing thread, a simple example:

class SimpleRunnable implements Runnable { public volatile boolean run = true; //Volatile for thread safety. public void run() { while(run) { System.out.println("WHOOOO!"); //Boy, will this be annoying } } } 

Creating a thread from this runnable:

 SimpleRunnable run = new SimpleRunnable(); Thread thread = new Thread(run); Thread.start(); //run thread //Stop thread run.run=false; //Thread will be removed when out of scope 

You need to create a Runnable for each user in your case, and then call to set a stop variable when creating a new thread. For example, you can store each runnable in ConcurrentHashMap by userId.

 ConcurrentHashMap<String,SimpleRunnable> runnablesByUser = new ConcurrentHashMap<>(); public void startNewThreadForUser(String userId){ //Time passes, retrieve and kill old thread: SimpleRunnable oldRunnable = runnableByUser.get(userId); if(oldRunnable!=null){ oldRunnable.run=false; } SimpleRunnable newRunnableUserOne = new SimpleRunnable(); runnablesByUser.put(userId,newRunnableUserOne); Thread thread = new Thread(newRunnableUserOne); thread.start(); } 

The method call will then destroy the old thread, if it is found, release the old one from the scope, replacing it with a new one in ConcurrentHashMap and finally starting a new thread. For instance:

 public void startThreeThreads(){ startNewThreadForUser("User1");//starts Thread for User1 startNewThreadForUser("User2");//starts Thread for User2 startNewThreadForUser("User1");//Replaces Thread for User1 } 

Running threads are usually managed in the thread pool, and this is rude in many ways, but hopefully it is useful.

I can develop this mechanism if you want.

+3
source

Starting a new stream every time you receive data from the user will lead to the exhaustion of resources, in addition, causing unnecessary overhead for managing too many threads. Your computer has a limited number of threads that can be launched at any given time and are limited by your processor. to find out this number, you can use the command

 Runtime.getRuntime().availableProcessors() 

on the other hand, if the jobs you want to process require a lot of I / O processing, you must run a few more threads than "Runtime.getRuntime (). availableProcessors ()", or you will be under - using your processor.

what I would do is use the "ExecutorService", which will handle the threads for you (there is no need to manually start, stop the threads). Just run "ExecutorService" with the total number of threads that you want to execute at the same time, and then each time you get more work from the User, send a new task (like Callable) to the ExecutorService. The executing service will handle the execution of this task for you, and once it is completed, it will become available for garbage collection.

for example, see the following code:

 import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MultipleClientsExample { public static final int TOTAL_THREADS_TO_PROCESS_CUSTOMER_WORK = 4; public static final Random random = new Random(); public static int customerCounter = 0; public static void main(String[] args) throws InterruptedException { MultipleClientsExample multipleClientsExample = new MultipleClientsExample(); multipleClientsExample.doTheWork(); } private void doTheWork() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_TO_PROCESS_CUSTOMER_WORK); while (customerCounter < 10) { try { CustomerInput customerInput = getWorkFromCustomer(); System.out.println("main program. received work from customer: " + customerInput.getCustomerId()); executorService.submit(new WorkToBeDone(customerInput.getCustomerId(), customerInput.getWorkInfo())); } catch (InterruptedException e) { break; } customerCounter++; } executorService.shutdown(); executorService.awaitTermination(5, TimeUnit.SECONDS); } private CustomerInput getWorkFromCustomer() throws InterruptedException { while (true) { String customerId = String.valueOf(random.nextInt(10)); CustomerInput customerInput = new CustomerInput(customerId, "work from customer: " + customerId); return customerInput; } } } class WorkToBeDone implements Callable<Void> { private String clientId; private String workInfo; public WorkToBeDone(String clientId, String workInfo) { this.clientId = clientId; this.workInfo = workInfo; } @Override public Void call() throws Exception { System.out.println("inside a working thread: it is going to do the work of customer: " + clientId); try { Thread.sleep(5000); } catch (InterruptedException e) { System.out.println("worker processing job from customer: " + clientId + " was interrupted. ending now"); return null; } System.out.println("work completed for customer: " + clientId); return null; } } class CustomerInput { private String customerId; private String workInfo; public CustomerInput(String customerId, String workInfo) { this.customerId = customerId; this.workInfo = workInfo; } public String getCustomerId() { return customerId; } public String getWorkInfo() { return workInfo; } } 
+2
source

If you want to cancel a task that has already been sent to the thread pool, you will need to continue to reference the future values ​​of each task and do not forget to delete the link to the completed tasks and which you canceled, so they are ready to collect garbage (otherwise you will have a leak memory).

eg

 import java.util.Iterator; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class MultipleClientsExample { public static final int TOTAL_THREADS_TO_PROCESS_CUSTOMER_WORK = 4; public static int customerCounter = 0; public static void main(String[] args) throws InterruptedException { MultipleClientsExample multipleClientsExample = new MultipleClientsExample(); multipleClientsExample.doTheWork(); } private void doTheWork() throws InterruptedException { final ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_TO_PROCESS_CUSTOMER_WORK); Map<String, Future<String>> map = new ConcurrentHashMap<>(); while (customerCounter < 11) { try { WorkToBeDone workToBeDone = getWorkFromCustomer(); System.out.println("main program. received work from customer: " + workToBeDone.getClientId()); Future<String> resultFuture = executorService.submit(workToBeDone); map.put(workToBeDone.getClientId(), resultFuture); } catch (InterruptedException e) { break; } customerCounter++; } // cancel job of customer with id: 10 Future<String> resultFuture = map.get("10"); System.out.println("cancelling job of customerId: 10"); resultFuture.cancel(true); // remove references of all completed jobs Thread.sleep(2000); System.out.println("looking for jobs that completed or were cancelled."); Iterator<Map.Entry<String, Future<String>>> iterator = map.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Future<String>> entry = iterator.next(); if (entry.getValue().isCancelled() || entry.getValue().isDone()) { System.out.println("removing reference of job for customer: " + entry.getKey()); iterator.remove(); } } // simpler way to remove entries from map (but doesn't print output of jobs removed from map) // map.entrySet().removeIf(entry -> entry.getValue().isCancelled() || entry.getValue().isDone()); executorService.shutdown(); executorService.awaitTermination(5, TimeUnit.SECONDS); } private WorkToBeDone getWorkFromCustomer() throws InterruptedException { String customerId = String.valueOf(customerCounter); WorkToBeDone workToBeDone = new WorkToBeDone(customerId, "work from customer: " + customerId); return workToBeDone; } } class WorkToBeDone implements Callable<String> { private String clientId; private String workInfo; public String getClientId() { return clientId; } public WorkToBeDone(String clientId, String workInfo) { this.clientId = clientId; this.workInfo = workInfo; } @Override public String call() throws Exception { System.out.println("inside a working thread: it is going to do the work of customer: " + clientId); try { Thread.sleep(2000); } catch (InterruptedException e) { System.out.println("worker processing job from customer: " + clientId + " was interrupted. ending now"); return clientId; } System.out.println("work completed for customer: " + clientId); return clientId; } } 
+2
source

Source: https://habr.com/ru/post/1262928/


All Articles