Spring-integration-ftp polling, even when my application is stopped

As an answer to the question -

The same file gets again and again in spring -ftp, but with different names

I have the following configuration in my application.xml application

<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <int:poller id="poller" task-executor="synchTaskExecutor" default="true" fixed-delay="1000" /> <beans:bean id="ftpClientFactory" class="com.everge.springframework.integration.ftp.session.EvergeFtpSessionFactory"> <beans:property name="host" value="111.93.128.170"/> <beans:property name="port" value="21"/> <beans:property name="username" value="singha"/> <beans:property name="password" value="singha16"/> <beans:property name="clientMode" value="2"></beans:property> </beans:bean> <beans:bean id="ftpOutClientFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory"> <beans:property name="host" value="111.93.128.170"/> <beans:property name="port" value="21"/> <beans:property name="username" value="singha"/> <beans:property name="password" value="singha16"/> <beans:property name="clientMode" value="2"></beans:property> </beans:bean> <beans:bean id="synchTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <beans:property name="corePoolSize" value="1"></beans:property> <beans:property name="maxPoolSize" value="1"></beans:property> <beans:property name="queueCapacity" value="1"></beans:property> </beans:bean> <beans:bean id="pqqFtpClientFactory" class="com.everge.springframework.integration.ftp.session.PqqEvergeFtpSessionFactory"> <beans:property name="host" value="111.93.128.170"/> <beans:property name="port" value="21"/> <beans:property name="username" value="singha"/> <beans:property name="password" value="singha16"/> <beans:property name="clientMode" value="2"></beans:property> </beans:bean> <int:channel id="ftpChannel"> <int:queue/> </int:channel> <beans:bean id="acceptAllFileListFilter" class="com.everge.file.processing.EvergeFileListFilter"/> <beans:bean id="pqqHandler" class="com.everge.pqq.PqqFileHandler"> <beans:property name="config" ref="baseConfig"></beans:property> </beans:bean> <beans:bean id="handler" scope="prototype" class="com.everge.integration.client.FileHandler"> <beans:property name="config" ref="baseConfig"></beans:property> </beans:bean> <beans:bean id="baseConfig" class="com.everge.config.BaseConfig" /> <beans:bean id="ftpSplitter" class="com.everge.service.FtpSplitter" /> <beans:bean id="fileSplitter" class="com.everge.file.processing.FileSplitter" /> <int-ftp:outbound-channel-adapter id="notifFtpOutBound" channel="pl" remote-directory="/ADPWG/PRCSD1" session-factory="ftpOutClientFactory" auto-startup="true"> <int-ftp:request-handler-advice-chain> <int:retry-advice /> </int-ftp:request-handler-advice-chain> </int-ftp:outbound-channel-adapter> <int-ftp:outbound-channel-adapter id="ftpOutbound" channel="pqqOutputFileChannel" remote-directory="/ADPWG/PRCSD" session-factory="ftpOutClientFactory" auto-startup="true"> <int-ftp:request-handler-advice-chain> <int:retry-advice /> </int-ftp:request-handler-advice-chain> </int-ftp:outbound-channel-adapter> <file:inbound-channel-adapter prevent-duplicates="false" id="filesIn1" directory="file:/Users/abhisheksingh/ddrive/everge_ws/f" auto-startup="true"> <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000"></int:poller> </file:inbound-channel-adapter> <int:service-activator input-channel="filesIn1" ref="handler" /> <file:inbound-channel-adapter prevent-duplicates="false" id="pqqInputFileChannel" directory="file:/Users/abhisheksingh/ddrive/everge_ws/pqqReq" auto-startup="true"> <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" /> </file:inbound-channel-adapter> <int:service-activator input-channel="pqqInputFileChannel" ref="pqqHandler" /> <file:inbound-channel-adapter id="pqqOutputFileChannel" directory="/Users/abhisheksingh/ddrive/everge_ws/pqqResp"> <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="10000" /> </file:inbound-channel-adapter> <int-ftp:inbound-channel-adapter id="ftpInbound" channel="ftpChannel" session-factory="ftpClientFactory" auto-create-local-directory="true" delete-remote-files="false" local-filter="acceptAllFileListFilter" local-directory="file:/Users/abhisheksingh/ddrive/everge_ws/f" auto-startup="true" > <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" /> </int-ftp:inbound-channel-adapter> <int-ftp:inbound-channel-adapter id="pqqFtpInbound" channel="ftpChannel" session-factory="pqqFtpClientFactory" auto-create-local-directory="true" delete-remote-files="false" local-filter="acceptAllFileListFilter" local-directory="file:/Users/abhisheksingh/ddrive/everge_ws/pqqReq" auto-startup="true" > <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" /> </int-ftp:inbound-channel-adapter> <file:inbound-channel-adapter id="pl" directory="file:/Users/abhisheksingh/ddrive/everge_ws/notifFile" auto-startup="true"> <int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" /> </file:inbound-channel-adapter> </beans:beans> 

So, there is an ftp place that I am polling, and the polled file is placed in the following directory on my local computer -

/ Users / abhisheksingh / ddrive / test / e

Now sometimes, because I found an error and must fix it, I stop the tomcat server. I delete the files from my local one so that the next time I start my server the same file can be polled again. But I found that the same file was polled again. My server is stopped! As far as I know, this should not happen. This is why I published my application.xml file to find out if there is anything here that is holding streams. Or is it that spring-integration-ftp starts a daemon thread that is application independent. Let me solve this riddle.

In tomcat logs, I see the following exception -

 Feb 17, 2017 11:49:24 PM org.apache.catalina.loader.WebappClassLoaderBase loadClass INFO: Illegal access: this web application instance has been stopped already. Could not load UNIX Type: L8. The eventual following stack trace is caused by an error thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access, and has no functional impact. java.lang.IllegalStateException at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1777) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1735) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.commons.net.ftp.parser.DefaultFTPFileEntryParserFactory.createFileEntryParser(DefaultFTPFileEntryParserFactory.java:88) at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2263) at org.apache.commons.net.ftp.FTPClient.listFiles(FTPClient.java:2046) at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:70) at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:43) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:236) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:232) at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:435) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:232) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:193) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:59) at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

Currently, I'm pretty sure that the ftp-poller integration prevents tomcat from stopping normally.

I researched more using jvisualvm. I see that the threads of the task scheduler have begun, which do not close with the call. /shutdown.sh to stop tomcat.

enter image description here

It is interesting to note that if I remove any of the pl or pqqOutputFileChannel adapter, these schedulers are not created.

Here is the structure of my ear file -

enter image description here

The following is a log that continues to roll in the log file, even when the shutdown was triggered on tomcat -

541576 DEBUG osicPublishSubscribeChannel - postSend (sent = true) on channel 'errorChannel', message: ErrorMessage [payload = org.springframework.core.task.TaskRejectedException: Executor [ java.util.concurrent.ThreadPoolExecutor@6d6033da [Running, pool size = 1, active threads = 0 , tasks set = 0, completed tasks = 1114]] do not accept the task: o rg.springframework.integration.util.ErrorHandlingTaskExecutor$1@ 570e117d, Headers = {ID = 71d77a8b-17ea-7011-3cb3-ddbf7591321f, timestamp = 1487870801603}] 541575 DEBUG osicPublishSellSubsend = true) on the channel errorChannel, message: ErrorMessage [payload = org.springframework.core.task.TaskRejectedException: Executor [ java.util.concurrent.ThreadPoolExecutor@6d6033da [Running, pool size = 1, active threads = 0, tasks set = 1, completed tasks = 1113]] did not take the problem: o rg.springframework.integration.util.ErrorHandlingTaskExecutor$1@ 44210f79, Header ki = {ID = 74a26d65-4b42-da1f- CDF1-b77fea8bfdb7, timestamp = 1487870801603}] 541 576 ERROR osihandler.LoggingHandler - org.springframework.core.task.TaskRejectedException: Artist [ java.util.concurrent.ThreadPoolExecutor@6d6033da [Running, pool size = 1, active threads = 0, tasks set = 1, completed tasks = 1113]] do not accept the task: o rg.springframework.integration.util.ErrorHandlingTaskExecutor$1@ 7d7cc2a4 in org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:296) in org.springframework. integration.util.ErrorHandlingTaskExecutor.execute (ErrorHandlingTaskExecutor.java:51) in org.springframework.integration.endpoint.AbstractPollingEndpoint $ Poller.run (AbstractPollingEndpoint.java indic44) in org.springframework.scheduling.supprunrrundableDablerrundableDablerrundableDablerrndablerablendrerndablerableringablerndablerableringablelegablerablering java: 54) in org.springframework.scheduling.concurrent.ReschedulingRunnable.run (ReschedulingRunnable.java:81 ) in java.util.concurrent.Executors $ RunnableAdapter.call (Executors.javaPoint11) in java.util.concurrent.FutureTask.run (FutureTask.java:266) in java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTaskaccess ) ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) on java.lang.Thread.run (Thread.java:745) Called: java.util.concurrent.RejectedExecutionException: Task o rg.springframework.integration.util.ErrorHandlingTaskExecutor$1@ 7d7cc2a4 rejected java.util.concurrent.ThreadPoolExecutor@6d6033da [ Running, pool size = 1, active threads = 0, tasks set = 1, completed tasks = 1113] on java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution (ThreadPoolExecutor.java:2047) in java.util.concurrent.ThreadPoolExecutor.reject (ThreadPoolExecutor.java:823) in java.util.concurrent.ThreadPoolExecutor.execute (ThreadPoolExecutor.java:1369) in org.springframework.scheduling.concurrent.ThreadPoolTaskExecutorutor : 293) ... 11 more

** 541576 DEBUG osicPublishSubscribeChannel - postSend (sent = true) on the 'errorChannel' channel, message: ErrorMessage [payload = org.springframework.core.task.TaskRejectedException: Executor [ java.util.concurrent.ThreadPoolExecutor@6d6033da [Running, pool size = 1, active threads = 0, tasks set = 1, completed tasks = 1113]] do not accept the task: o rg.springframework.integration.util.ErrorHandlingTaskExecutor$1@ 7d7cc2a4, Headers = {ID = 26e6de67-2b70-cd7d-0c64-d21e1f8d1726, timestamp = 1487870801603}] 541611 DEBUG cesifsPtqqver - to the server [111.93.128.170:21] 541638 INFO cesifsPqqEvergeFtpSessionFactory - Inside postProcessClientAfterConnect from PqqEvergeFtpSessionFactory 541778 INFO osiftp.session.FtpSession - FileTef1FQF1f1fpfrent1 check if the file has already been processed / Users / abhisheksingh / ddrive / everge _ws / pqqArchive / TEST4.PQQ 541779 INFO cefpEvergeFileListFilter - the name of the archive file 541779 INFO cefpEvergeFileListFilter - Input file name - TEST4FEFerFerlfer1f1ferferterf1feristrefer77ferferfisterfilter77177ferferfisterfisterfilter77f77ferferfisterfisterfilter77177ferferfisterfisterfilter77177ferferferisterfilter77ferferfisterfilfisterfilter77ferfilterfisterfilferfilterfisterfilter The input file name is TEST4 541779 INFO cefpEvergeFileListFilter - the file has already been processed TEST4 541779 ERROR cefpEvergeFileListFilter - the PQQ file has already been processed. 541779 DEBUG osieSourcePollingChannelAdapter - message was not received during the poll returning "false" 541977 DEBUG osieSourcePollingChannelAdapter - message was not received during the poll returning "false"

+6
source share

Source: https://habr.com/ru/post/1014805/


All Articles