Mule ESB - two files as input (waiting for both)

I want to create a stream where:

  • Waiting for two files: file_name.xdf and file_name.dff: if both files (I want to process two files at the same time (waiting for both), the file name of these files will be the same)
  • process these files into an array of bytes
  • run groovy script

How can I make the first point?

+6
source share
3 answers

You can aggregate files based on their "base name" (that is, take the file name without the extension), and then process each file in an aggregated set.

Aggregation can be performed using Collection-Aggregator or Custom-Aggregator, here is an example for each of them:

Using Collect-Aggregator:

<flow name="two-files-per-process-with-collection-aggregator"> <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files" > <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/> </file:inbound-endpoint> <set-property propertyName="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(0,message.inboundProperties.originalFilename.lastIndexOf('.'))]" doc:name="Set Correlation-Id"/> <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Set Correlation-Group-Size"/> <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/> <foreach doc:name="For Each"> <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/> </foreach> </flow> 


Using Custom-Aggregator (you'll need a custom java class):

 <flow name="two-files-per-process-with-custom-aggregator"> <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files"> <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/> </file:inbound-endpoint> <custom-aggregator failOnTimeout="true" class="org.mnc.MatchFileNames" doc:name="Custom Aggregator"/> <foreach doc:name="For Each"> <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/> </foreach> </flow> 

And this is a possible implementation for a custom aggregator (it can be more elegant:

 package org.mnc; import org.mule.api.MuleContext; import org.mule.api.MuleEvent; import org.mule.api.routing.RoutingException; import org.mule.routing.AbstractAggregator; import org.mule.routing.EventGroup; import org.mule.routing.correlation.EventCorrelatorCallback; public class MatchFileNames extends AbstractAggregator { @Override protected EventCorrelatorCallback getCorrelatorCallback(final MuleContext muleContext) { return new EventCorrelatorCallback() { @Override public boolean shouldAggregateEvents(EventGroup events) { return events.size()==2; } @Override public EventGroup createEventGroup(MuleEvent event, Object id) { String filename = event.getMessage().getInboundProperty("originalFilename"); String commonFilename = filename.substring(0, filename.lastIndexOf('.')); System.out.println(filename + " -> " + commonFilename); return new EventGroup(commonFilename, muleContext, 2, true, storePrefix); } @Override public MuleEvent aggregateEvents(EventGroup events) throws RoutingException { return events.getMessageCollectionEvent(); } }; } } 
+2
source

Using a quartz component - starting your thread at a specified interval. Read more about this here: http://www.mulesoft.org/documentation/display/current/Quartz+Transport+Reference

At startup, write Java code that compares these two directories and finds pairs of files between them.

On top of my head, I'm not sure if there is a way to configure the filter for incoming files dynamically. Otherwise, you can always process the entire process in Java; read the files, convert them to a byte array, keep propagating the message to the groovy script.

0
source

You can have two endpoints for incoming files, one for each of the expected files. When your stream reads files, it copies the file to another directory. If another file has already been processed and moved to the directory (you can track this using a variable in the object store), you save it with the name .ready and move the previously moved file to the name2.ready.

You have a third stream with the endpoint of the incoming file that reads from this directory using the * .ready template template. And then use the Requester> Module to load another file into the variable.

0
source

Source: https://habr.com/ru/post/943487/


All Articles