So this is my current setup:
<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
channel-transacted="true"
transaction-manager="dataSourceTransactionManager"/>
<int:chain input-channel="input-channel" output-channel="inputc1">
<int:service-activator ref="h1" method="handle" />
<int:service-activator ref="h2" method="handle" />
<int:service-activator ref="h3" method="handle" />
<int:splitter />
</int:chain>
<int:publish-subscribe-channel id="inputc1"/>
<int:claim-check-in input-channel="inputc1" output-channel="nullChannel" message-store="messageStore" order="1" />
<int:bridge input-channel="inputc1" output-channel="inputc2" order="2" />
<int:publish-subscribe-channel id="inputc2" task-executor="taskExecutor" />
<int-amqp:outbound-channel-adapter channel="inputc2" exchange-name="exch" amqp-template="rabbitTemplate" order="1" />
<int:service-activator input-channel="inputc2" output-channel="nullChannel"
expression="@messageStore.removeMessage(headers['id'])" order="2" />
and the image of this:

What I need is that the transaction (transaction-manager = "dataSourceTransactionManager", which should be part of pink), is executed completely before any message is sent to int: brdige or from int: bridge (blue arrow - which should actually forward messages only if jdbc committed).
Thank!
UPDATE
Here is a description of why I need this setup:
: amqp, db amqp . ( , , , ..).
, , , ββ , .
:
thread1:
- AMQP
- AMQP - (, db, )
- AMQP ( ) -
- COMMIT
- ACK AMQP
- Forward outgoing message to THREAD2 in process (not DB pointer, but real message)
thread2:
- Receiving a message in progress from THREAD1
- Try to send message AMQP
- Upon successful sending, delete the outgoing AMQP message from the database, which is saved in step 3 of THREAD1.
THREAD3:
- Polling protocol for sent messages (from THREAD1 step 3) (every 10 seconds)
- If any new message is found, mark it for sending in the next poll (meanwhile THREAD2 may delete this message)
- If the message still exists in the second poll (after 20 seconds), then THREAD2 could not send it, so here we create a new thread that will perform the same task as THREAD2.
UPDATE 2
Tried this setting, but had some problems:
<int:transaction-synchronization-factory id="transactionSynchronizationFactory">
<int:after-commit expression="payload" channel="committed-channel" />
</int:transaction-synchronization-factory>
<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
channel-transacted="true"
transaction-manager="dataSourceTransactionManager" advice-chain="amqpMethodInterceptor"/>
and
@Component
public class AmqpMethodInterceptor implements MethodInterceptor {
private TransactionSynchronizationFactory factory;
public AmqpMethodInterceptor(TransactionSynchronizationFactory factory){
this.factory = factory;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization synchronization = factory.create("123");
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
Object result = invocation.proceed();
return result;
}
}
Calledafter-commit, null, , commit. , ?