Setting a circuit breaker with a service activator

Configuration Details:

  <int:publish-subscribe-channel id="toKafka"/>

  <int:publish-subscribe-channel id="sendMessageToKafkaChannel"/>

  <int:service-activator input-channel="toKafka" output-channel="sendMessageToKafkaChannel" order="1" ref="conditionalProducerService" method="producerCircuitBreaker">
       <int:request-handler-advice-chain> 
              <ref bean="circuitBreakerAdvice" />
       </int:request-handler-advice-chain>
  </int:service-activator>

  <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
                                                                       auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>

  <bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
    <property name="threshold" value="2"/>                      
    <property name="halfOpenAfter" value="15000" />                     
  </bean>

  public Message<?> producerCircuitBreaker(Message<?> payload) {
      throw  new RuntimeException("foo Pro");
  }

  for(int i=0;i<4;i++){
          toKafka.send(MessageBuilder
                              .withPayload(messageVO.getMessageContentVO())                                  
                              .setHeader(KafkaHeaders.TOPIC, topic)
                              .setHeader(KafkaHeaders.PARTITION_ID,Integer.parseInt(messageVO.getPartition())).
                              build());

                 APPLOGGER.info("sending message");
   }

We expect that the process will end with an error 2 times with an exception, and then with the "circuit breaker" tripping, but it just stops after selecting an exception in the console.

Just as we can configure the error channel here.

https://gist.github.com/anonymous/67aae50e548c78470cd0

updated configuration:

<int:service-activator input-channel="toKafka"  ref="gw">
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
                      </int:request-handler-advice-chain> 
   </int:service-activator>

   <int:channel id="failedChannel1" />


   <int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1"  />

     <int:chain input-channel="failedChannel1">
        <int:transformer expression="'failed:'+payload.failedMessage.payload+ ' with a' +payload.cause.message" />
        <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>  

falls below the exception.
failed: TestVo [data = sample message]] with Cannot process the message.
https://gist.github.com/anonymous/921be7691c41d125dc84
however it works with the same message differently. (message content intentionally changed)

: . broker-list/value-class-type , , .

, , CB , .

value-class: CB , , 1 .

: TestVo [data = {tes message}}] , xx.xxx.vo.TestVo java.lang.String

.

: .

https://gist.github.com/anonymous/6ece517fb5e82ac73492

: CB .

  <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
                                                                       auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>  

  <int-kafka:producer-context id="producerContext" producer-properties="producerProperties">
                                                            <int-kafka:producer-configurations>
                                                                           <int-kafka:producer-configuration
                                                                                            broker-list="1.2.3:9092" topic="headers['topic']" key-class-type="java.lang.String"
                                                                                            value-class-type="java.lang.String"
                                                                                            value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder"
                                                                                            compression-type="none" />
                                                            </int-kafka:producer-configurations>
                                            </int-kafka:producer-context>
+4
1

try {...} send().

RuntimeException; .

Messaging Gateway .

...

<int:service-activator input-channel="toKafka"  ref="gw">
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
                  </int:request-handler-advice-chain> 
</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1"  />

toKafka, , toKafka .

.

+3

Source: https://habr.com/ru/post/1621464/


All Articles