Spring - commit transaction integration before the rest of the pipeline

So this is my current setup:

<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
                                  channel-transacted="true"
                                  transaction-manager="dataSourceTransactionManager"/>
<int:chain input-channel="input-channel" output-channel="inputc1">
    <int:service-activator ref="h1" method="handle" />
    <int:service-activator ref="h2" method="handle" />
    <int:service-activator ref="h3" method="handle" />
    <int:splitter  />
</int:chain>

<int:publish-subscribe-channel id="inputc1"/>

<int:claim-check-in input-channel="inputc1" output-channel="nullChannel" message-store="messageStore" order="1" />

<int:bridge input-channel="inputc1" output-channel="inputc2" order="2" />

<int:publish-subscribe-channel id="inputc2" task-executor="taskExecutor" />

<int-amqp:outbound-channel-adapter channel="inputc2" exchange-name="exch" amqp-template="rabbitTemplate" order="1" />
<int:service-activator input-channel="inputc2" output-channel="nullChannel"
                       expression="@messageStore.removeMessage(headers['id'])" order="2" />

and the image of this:

enter image description here

What I need is that the transaction (transaction-manager = "dataSourceTransactionManager", which should be part of pink), is executed completely before any message is sent to int: brdige or from int: bridge (blue arrow - which should actually forward messages only if jdbc committed).

Thank!

UPDATE

Here is a description of why I need this setup:

: amqp, db amqp . ( , , , ..). , , , ​​ , .

:

thread1:

  • AMQP
  • AMQP - (, db, )
  • AMQP ( ) -
  • COMMIT
  • ACK AMQP
  • Forward outgoing message to THREAD2 in process (not DB pointer, but real message)

thread2:

  • Receiving a message in progress from THREAD1
  • Try to send message AMQP
  • Upon successful sending, delete the outgoing AMQP message from the database, which is saved in step 3 of THREAD1.

THREAD3:

  • Polling protocol for sent messages (from THREAD1 step 3) (every 10 seconds)
  • If any new message is found, mark it for sending in the next poll (meanwhile THREAD2 may delete this message)
  • If the message still exists in the second poll (after 20 seconds), then THREAD2 could not send it, so here we create a new thread that will perform the same task as THREAD2.

UPDATE 2

Tried this setting, but had some problems:

<int:transaction-synchronization-factory id="transactionSynchronizationFactory">
    <int:after-commit expression="payload" channel="committed-channel" />
</int:transaction-synchronization-factory>

<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
                                  channel-transacted="true"
                                  transaction-manager="dataSourceTransactionManager" advice-chain="amqpMethodInterceptor"/>

and

@Component
public class AmqpMethodInterceptor implements MethodInterceptor {

    private TransactionSynchronizationFactory factory;

    public AmqpMethodInterceptor(TransactionSynchronizationFactory factory){
        this.factory = factory;
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {


        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronization synchronization = factory.create("123");
            TransactionSynchronizationManager.registerSynchronization(synchronization);
        }

        Object result = invocation.proceed();

        return result;
    }
}
Called

after-commit, null, , commit. , ?

+4
1

TransactionSynchronization.

"" TX, send(). , executor, , .

MethodInterceptor <int-amqp:inbound-channel-adapter> advice-chain. DefaultTransactionSynchronizationFactory ExpressionEvaluatingTransactionSynchronizationProcessor, afterCommitChannel.

Advice :

if (TransactionSynchronizationManager.isActualTransactionActive()) {
                TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(key);
                TransactionSynchronizationManager.registerSynchronization(synchronization);
}

key , TX.

UPDATE

null, , .

, TransactionSynchronizationFactory .

, , .

factory.create("123"); :

    DefaultTransactionalResourceSynchronization synchronization = new DefaultTransactionalResourceSynchronization(key);
    TransactionSynchronizationManager.bindResource(key, synchronization.getResourceHolder());
    return synchronization;

, TransactionSynchronizationManager.bindResource(). - , TX, :

IntegrationResourceHolder holder =
                    (IntegrationResourceHolder) TransactionSynchronizationManager.getResource("123");
holder.setMessage(message);

, :

 <int:outbound-channel-adapter 
        expression="T(org.springframework.transaction.support.TransactionSynchronizationManager).getResource('123').setMessage(#root)"/>

TX.

+1

Source: https://habr.com/ru/post/1663532/


All Articles