Spring testing @async method

I am trying to check if @Async Spring annotation works in my project as expected. But this is not so.

I have this test:

  @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = GlobalConfiguration.class) public class ActivityMessageListenerTest { @Autowired private ActivityMessageListener activityMessageListener; private Long USER_ID = 1l; private Long COMPANY_ID = 2l; private Date DATE = new Date(10000000); private String CLASSNAME = "className"; private Long CLASSPK = 14l; private Integer TYPE = 22; private String EXTRA_DATA = "extra"; private Long RECIVED_USER_ID = 99l; @Before public void setup() throws Exception { } @Test public void testDoReceiveWithException() throws Exception { System.out.println("Current thread " + Thread.currentThread().getName()); Map<String, Object> values = new HashMap(); values.put(ActivityMessageListener.PARAM_USER_ID, USER_ID); values.put(ActivityMessageListener.PARAM_COMPANY_ID, COMPANY_ID); values.put(ActivityMessageListener.PARAM_CREATE_DATE, DATE); values.put(ActivityMessageListener.PARAM_CLASS_NAME, CLASSNAME); values.put(ActivityMessageListener.PARAM_CLASS_PK, CLASSPK); values.put(ActivityMessageListener.PARAM_TYPE, TYPE); values.put(ActivityMessageListener.PARAM_EXTRA_DATA, EXTRA_DATA ); values.put(ActivityMessageListener.PARAM_RECEIVED_USER_ID, RECIVED_USER_ID); Message message = new Message(); message.setValues(values); MessageBusUtil.sendMessage(MKTDestinationNames.ACTIVITY_REGISTRY, message); } } 

As you can see, I am printing the name of the current thread. Class containing the @Async method:

  public class ActivityMessageListener extends BaseMessageListener { public static final String PARAM_USER_ID = "userId"; public static final String PARAM_COMPANY_ID = "companyId"; public static final String PARAM_CREATE_DATE = "createDate"; public static final String PARAM_CLASS_NAME = "className"; public static final String PARAM_CLASS_PK = "classPK"; public static final String PARAM_TYPE = "type"; public static final String PARAM_EXTRA_DATA = "extraData"; public static final String PARAM_RECEIVED_USER_ID = "receiverUserId"; public ActivityMessageListener() { MessageBusUtil.addQueue(MKTDestinationNames.ACTIVITY_REGISTRY, this); } @Override @Async(value = "activityExecutor") public void doReceive(Message message) throws Exception { System.out.println("Current " + Thread.currentThread().getName()); if (1> 0) throw new RuntimeException("lalal"); Map<String, Object> parameters = message.getValues(); Long userId = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID); Long companyId = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID); Date createDate = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE); String className = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME); Long classPK = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK); Integer type = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE); String extraData = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA); Long receiverUserId = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID); ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId); } } 

Here I print the name of the current thread inside the @Async method, and the name is the same as before, main. Therefore it does not work.

Global configuration:

 @Configuration @EnableAspectJAutoProxy @EnableTransactionManagement @ComponentScan({ "com.shn.configurations", ...some packages... }) public class GlobalConfiguration {...} 

And inside one of the specified packages there is an ActivityExecutor bean:

 @Configuration @EnableAsync(proxyTargetClass = true) public class ExecutorConfiguration { @Bean public ActivityMessageListener activityMessageListener() { return new ActivityMessageListener(); } @Bean public TaskExecutor activityExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(10); threadPoolTaskExecutor.setMaxPoolSize(10); threadPoolTaskExecutor.setQueueCapacity(100); return threadPoolTaskExecutor; } } 

What am I doing wrong?

+6
source share
1 answer

Tricky

Asynchronous behavior is added through proxying.

Spring provides a proxy server that wraps the actual object and makes the actual call in a separate thread.

It looks something like this (except that most of them are executed dynamically using CGLIB or JDK and Spring handlers)

 class ProxyListener extends ActivityMessageListener { private ActivityMessageListener real; public ProxyListener(ActivityMessageListener real) { this.real = real; } TaskExecutor executor; // injected @Override public void doReceive(Message message) throws Exception { executor.submit(() -> real.doReceive(message)); // in another thread } } ActivityMessageListener real = new ActivityMessageListener(); ProxyListener proxy = new ProxyListener(real); 

Now in the Spring world, you will have a reference to the proxy object, not to the ActivityMessageListener . it

 ActivityMessageListener proxy = applicationContext.getBean(ActivityMessageListener.class); 

will return a link to the ProxyListener . Then, through polymorphism, the doReceive call will go to the overriden Proxy#doReceive , which will call the ActivityMessageListener#doReceive through delegation, and you will get your asynchronous behavior.

However, you are in half the world of Spring.

Here

 public ActivityMessageListener() { MessageBusUtil.addQueue(MKTDestinationNames.ACTIVITY_REGISTRY, this); } 

this link actually refers to the real ActivityMessageListener , not the proxy. Therefore, when, presumably, you send your message by bus here.

 MessageBusUtil.sendMessage(MKTDestinationNames.ACTIVITY_REGISTRY, message); 

you send it to a real object that does not have proxy asynchronous behavior.

The complete Spring solution should be for MessabeBus (and / or its queue) to be Spring beans, in which you can enter the whole process (proxy, automatically, initialized) beans.


In fact, since CGLIB proxies are really just subclasses of your types, so the ProxyListener described above ProxyListener also add itself to the bus, as the super constructor will be called. It would seem that only one MessageListener can register itself with a key, for example MKTDestinationNames.ACTIVITY_REGISTRY . If this is not the case, you will need to show more of this code for explanation.


In your test, if you do

 activityMessageListener.doReceive(message); 

you should see that the asynchronous behavior with the ActivityMessageListener should contain a link to the proxy.

+3
source

Source: https://habr.com/ru/post/981897/


All Articles