You can aggregate files based on their "base name" (that is, take the file name without the extension), and then process each file in an aggregated set.
Aggregation can be performed using Collection-Aggregator or Custom-Aggregator, here is an example for each of them:
Using Collect-Aggregator:
<flow name="two-files-per-process-with-collection-aggregator"> <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files" > <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/> </file:inbound-endpoint> <set-property propertyName="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(0,message.inboundProperties.originalFilename.lastIndexOf('.'))]" doc:name="Set Correlation-Id"/> <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Set Correlation-Group-Size"/> <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/> <foreach doc:name="For Each"> <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/> </foreach> </flow>
Using Custom-Aggregator (you'll need a custom java class):
<flow name="two-files-per-process-with-custom-aggregator"> <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files"> <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/> </file:inbound-endpoint> <custom-aggregator failOnTimeout="true" class="org.mnc.MatchFileNames" doc:name="Custom Aggregator"/> <foreach doc:name="For Each"> <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/> </foreach> </flow>
And this is a possible implementation for a custom aggregator (it can be more elegant:
package org.mnc; import org.mule.api.MuleContext; import org.mule.api.MuleEvent; import org.mule.api.routing.RoutingException; import org.mule.routing.AbstractAggregator; import org.mule.routing.EventGroup; import org.mule.routing.correlation.EventCorrelatorCallback; public class MatchFileNames extends AbstractAggregator { @Override protected EventCorrelatorCallback getCorrelatorCallback(final MuleContext muleContext) { return new EventCorrelatorCallback() { @Override public boolean shouldAggregateEvents(EventGroup events) { return events.size()==2; } @Override public EventGroup createEventGroup(MuleEvent event, Object id) { String filename = event.getMessage().getInboundProperty("originalFilename"); String commonFilename = filename.substring(0, filename.lastIndexOf('.')); System.out.println(filename + " -> " + commonFilename); return new EventGroup(commonFilename, muleContext, 2, true, storePrefix); } @Override public MuleEvent aggregateEvents(EventGroup events) throws RoutingException { return events.getMessageCollectionEvent(); } }; } }
source share