How to work with JSON message with spring -rabbit application in spring to load?

Here are my code snippets.

  • MQConfiguration class for configuration

    @Configuration
    public class MQConfiguration {
        @Bean
        public Receiver receiver() {
            return new Receiver();
        }
    }
    
  • Receiver class for working with receiving messages

    @RabbitListener(queues = "testMQ")
    public class Receiver {
    
        @RabbitHandler
        public void receive(Message msg){
            System.out.println(msg.toString());
        }
    }
    
  • And here is the JSON message that I sent to RabbitMQ

    {
        "id": 1,
        "name": "My Name",
        "description": "This is description about me"
    }
    

However, when I started my application, I received the following error message.

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted

So what should I do if all I want to do is print a JSON message in a method receive()? I would really appreciate that anyone can shed light on this. :)

+4
source share
2 answers

If you are using Spring Boot, you just need to configure:

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

Otherwise, you need to configure:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
    return factory;
}

http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

0

JSON RabbitMQ Spring Boot, content_type.

, Python Producer Java- ( JSON Python RabbitMQ Spring Boot Java JSON).

:

1: JSON Jakson GSON

content_type = "text/plain" JSON . Spring fuction ​​ .

RabbitHandler:

@RabbitHandler
public void receive(String inputString) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);

    System.out.println("String instance "  + theResult.toString() +
            " [x] Received");
}

SimStatusReport:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SimStatusReport {
    private String id;
    private int t;
}

Python:

import pika
import json
import uuid


connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channelResult = connectionResult.channel()
routing_key_result = 'sim_results'
channelResult.queue_declare(queue=routing_key_result, durable=True)

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="text/plain",
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)


newsim_status = {'id': str(uuid.uuid4()), 't': 0}
publish_result(newsim_status)

2: JSON Jackson2JsonMessageConverter .

content_type = "application/json". __TypeId__ RabbitMQ. , .

, Python ( publish_result):

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="application/json"
                                    headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)

Java Jackson2JsonMessageConverter:

@Configuration
    public class RabbitConfiguration {
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

:

@RabbitListener(queues = "sim_results")
public class TaskReceiver {
    @RabbitHandler
    public void receive(SimStatusReport in) {
        System.out.println("Object instance "  + in +
                " [x] Received");
    }
}

: , . @Data, @NoArgsConstructor @AllArgsConstructor lombok,

0

Source: https://habr.com/ru/post/1671014/


All Articles