In my code, I have a number of executive services that run as a pipeline in that the first executing agent can send the task to any subsequent executing service, but it will never be the other way around.
services.add(songLoaderService);
services.add(AcoustIdMatcher.getExecutorService());
services.add(SongPrematcherMatcher.getExecutorService());
services.add(MusicBrainzMetadataMatcher.getExecutorService());
songLoaderService.submit(loader);
We send only one task for the first service, after which I can request a stop. This will not succeed until this task is completed, and by then it will put some tasks on the second service, etc.
Thus, this code has always worked for several years, shutdown () is never called until all tasks that have been submitted are completed, and the waitaitTermination () method is complete until all tasks that have been sent will not be completed.
int count = 0;
for (ExecutorService service : services)
{
MainWindow.logger.severe("Requested Shutdown Task:" + count + ":"+((SongKongThreadFactory)((TimeoutThreadPoolExecutor) service).getThreadFactory()).getName());
service.shutdown();
service.awaitTermination(10, TimeUnit.DAYS);
MainWindow.logger.severe("Completed Shutdown Task:" + count);
if(count==2)
{
MainWindow.logger.severe("Report:"+currentReportId+":SongPreMatcher:" + SongPrematcherMatcher.getPipelineQueuedCount()+":"+ SongPrematcherMatcher.getPipelineCallCount()+":"+ SongPrematcherMatcher.getPipelineCompletedCount()+":"+SongPrematcherMatcher.getPipelineFileCount());
}
count++;
}
, ExecutorService .
SongPrematcherMatcher , , (AcoustIdMatcher), , , ,
Report:353:SongPreMatcher:init:57:started:57:Finished:56
, , , .
, , , MusicBrainzMetadataMatcher, , , (PrematcherMatched) .
PrematcherMatcher , , - , , .
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:0
analyser.AcoustIdMatcher:<init>:SEVERE: GROUP 115:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:0
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:1:analyser.AcoustIdMatcher
analyser.SongPrematcherMatcher:<init>:SEVERE: Queue:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.SongPrematcherMatcher:call:SEVERE: Start:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:1
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:2:analyser.SongPrematcherMatcher
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:2
toplevelanalyzer.FixSongsController:start:SEVERE: Report:353:SongPreMatcher:init:57:started:57:Finished:56
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:3:analyser.MusicBrainzMetadataMatcher
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 795:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 797:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 799:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 821:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 823:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.SongPrematcherMatcher:call:SEVERE: Finish:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
, , , , , , -, . , , .
for (ExecutorService service : services)
{
Thread.sleep(5000);
service.shutdown();
......
}
, -. , , .
PreMatcherMatcher .
package com.jthink.songkong.analyse.analyser;
import com.jthink.songkong.analyse.general.Errors;
import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
private final static int WAIT_BEFORE_STOP = 10000;
public long getTimeout()
{
return timeout;
}
public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue,long timeout, TimeUnit timeoutUnit)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
return new FutureCallable<T>(callable);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
MainWindow.logger.warning("beforeExecute:"+t.getName()+":"+r.toString());
SongKong.checkIn();
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t,r), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
MainWindow.logger.warning("afterExecute:"+r.toString());
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
@Override
protected void terminated()
{
MainWindow.logger.warning("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
timeoutExecutor.shutdown();
}
class TimeoutTask implements Runnable {
private final Thread thread;
private Callable c;
public TimeoutTask(Thread thread, Runnable c) {
this.thread = thread;
if(c instanceof FutureCallable)
{
this.c = ((FutureCallable) c).getCallable();
}
}
@Override
public void run()
{
String msg = "";
if (c != null)
{
if (c instanceof AcoustIdMatcher)
{
msg = c.getClass() + ":" + ((AcoustIdMatcher) c).getSongGroup().getKey();
}
else if (c instanceof SongPrematcherMatcher)
{
msg = c.getClass() + ":" + ((SongPrematcherMatcher) c).getSongGroup().getKey();
}
else if (c instanceof MusicBrainzSongGroupMatcher)
{
msg = c.getClass() + ":" + ((MusicBrainzSongGroupMatcher) c).getSongGroup().getKey();
}
else if (c instanceof MusicBrainzMetadataMatcher)
{
msg = c.getClass() + ":" + ((MusicBrainzMetadataMatcher) c).getSongGroup().getKey();
}
else if (c instanceof MusicBrainzUpdateSongOnly)
{
msg = c.getClass() + ":" + ((MusicBrainzUpdateSongOnly) c).getSongGroup().getKey();
}
else if (c instanceof DiscogsSongGroupMatcher)
{
msg = c.getClass() + ":" + ((DiscogsSongGroupMatcher) c).getSongGroup().getKey();
}
else if (c instanceof MusicBrainzSongMatcher)
{
msg = c.getClass() + ":" + String.valueOf(((MusicBrainzSongMatcher) c).getSongId());
}
else if (c instanceof SongSaver)
{
msg = c.getClass() + ":" + String.valueOf(((SongSaver) c).getSongId());
}
else
{
msg = c.getClass().getName();
}
}
if (c != null && c instanceof CancelableTask)
{
MainWindow.logger.warning("+++Cancelling " + msg + " task because taking too long");
((CancelableTask) c).setCancelTask(true);
StackTraceElement[] stackTrace = thread.getStackTrace();
Errors.addError("Cancelled " + msg + " because taken too long", stackTrace);
Counters.getErrors().getCounter().incrementAndGet();
if(stackTrace.length>0)
{
boolean isKnownProblem = false;
for(int i=0;i<stackTrace.length;i++)
{
if(
(stackTrace[i].getClassName().contains("CosineSimilarity")) ||
(stackTrace[i].getClassName().contains("com.jthink.songkong.fileloader.FileFilters"))
)
{
isKnownProblem=true;
break;
}
}
if(isKnownProblem)
{
MainWindow.logger.warning("+++Interrupting " + msg + " task because taking too long");
thread.interrupt();
try
{
Thread.sleep(WAIT_BEFORE_STOP);
}
catch (InterruptedException ie)
{
MainWindow.logger.warning("+++Interrupted TimeoutTask " + msg + " task because taking too long");
}
if(thread.isAlive())
{
MainWindow.logger.warning("+++Stopping CosineSimailarity task");
thread.stop();
}
}
}
}
}
}
}
public class AnalyserService
{
protected static final int BOUNDED_QUEUE_SIZE = 500;
protected String threadGroup;
public AnalyserService(String threadGroup)
{
this.threadGroup=threadGroup;
}
protected ExecutorService executorService;
protected void initExecutorService()
{
int workerSize = Runtime.getRuntime().availableProcessors();
executorService = new PausableExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),new SongKongThreadFactory(threadGroup));
}
public ExecutorService getExecutorService()
{
if (executorService == null || executorService.isShutdown())
{
initExecutorService();
}
return executorService;
}
public void submit(Callable<Boolean> task)
{
executorService.submit(task);
}
}
public class AnalyserServiceWithTimeout extends AnalyserService
{
private static final int TIMEOUT_PER_TASK = 30;
public AnalyserServiceWithTimeout(String threadGroup)
{
super(threadGroup);
}
@Override
protected void initExecutorService()
{
int workerSize = Runtime.getRuntime().availableProcessors();
executorService = new TimeoutThreadPoolExecutor(workerSize,
new SongKongThreadFactory(threadGroup),
new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
TIMEOUT_PER_TASK,
TimeUnit.MINUTES);
}
}
package com.jthink.songkong.analyse.analyser;
import com.google.common.base.Strings;
import com.jthink.songkong.analyse.general.Errors;
import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.db.SongCache;
import com.jthink.songkong.match.MetadataGatherer;
import com.jthink.songkong.preferences.UserPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadGroup;
import com.jthink.songlayer.Song;
import com.jthink.songlayer.hibernate.HibernateUtil;
import org.hibernate.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
public class SongPrematcherMatcher extends CancelableTask implements Callable<Boolean> {
private static PipelineCount pipelineCount = new PipelineCount();
public static int getPipelineQueuedCount()
{
return pipelineCount.getQueuedCount();
}
public static int getPipelineCallCount()
{
return pipelineCount.getCallCount();
}
public static void resetPipelineCount()
{
pipelineCount.resetCounts();
}
public static int getPipelineFileCount()
{
return pipelineCount.getFileCount();
}
public static int getPipelineCompletedCount()
{
return pipelineCount.getCompletedCount();
}
private static AnalyserService analyserService = new AnalyserServiceWithTimeout(SongKongThreadGroup.THREAD_PREMATCHER_WORKER);
private Session session;
private SongGroup songGroup;
public SongGroup getSongGroup()
{
return songGroup;
}
public SongPrematcherMatcher(SongGroup songGroup)
{
SongKong.logger.severe("Queue:"+ songGroup.getKey());
pipelineCount.incQueuedCount();
pipelineCount.incFileCount(songGroup.getSongIds().size());
this.songGroup = songGroup;
}
public static ExecutorService getExecutorService()
{
return analyserService.getExecutorService();
}
public static AnalyserService getService()
{
return analyserService;
}
public Boolean call()
{
try
{
SongKong.logger.severe("Start:" + songGroup.getKey());
if (SongKong.isStopTask() || isCancelTask())
{
return false;
}
SongKong.checkIn();
pipelineCount.incCallCount();
session = HibernateUtil.beginTransaction();
AnalysisStats stats = new AnalysisStats();
List<Song> songs = SongCache.loadSongsFromDatabase(session, songGroup.getSongIds());
try
{
new RecordingOnlyMatcher().matchRecordingsOnlyByAcoustid(session, songGroup, songs, stats);
}
catch(Exception ex)
{
MainWindow.logger.log(Level.SEVERE, Strings.nullToEmpty(ex.getMessage()), ex);
Errors.addError(Strings.nullToEmpty(ex.getMessage()));
}
session.getTransaction().commit();
HibernateUtil.closeSession(session);
processSongsWithNewMetadata(songGroup, songs);
pipelineCount.incCompletedCount();
SongKong.logger.severe("Finish:" + songGroup.getKey());
return true;
}
catch (Exception e)
{
SongKong.logger.severe("FinishFail:" + songGroup.getKey());
MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + e.getMessage(), e);
if (session.getTransaction() != null)
{
session.getTransaction().rollback();
}
return false;
}
catch (Error e)
{
SongKong.logger.severe("FinishFail:" + songGroup.getKey());
MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + e.getMessage(), e);
if (session.getTransaction() != null)
{
session.getTransaction().rollback();
}
return false;
}
catch (Throwable t)
{
SongKong.logger.severe("FinishFail:" + songGroup.getKey());
MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + t.getMessage(), t);
if (session.getTransaction() != null)
{
session.getTransaction().rollback();
}
return false;
}
finally
{
if(session.isOpen())
{
session.getTransaction().commit();
HibernateUtil.closeSession(session);
}
}
}
private boolean processSongsWithNewMetadata(SongGroup songGroup, List<Song> songs)
{
MainWindow.logger.info("Prematcher:" + songGroup.getKey() + ":totalcount:" + songs.size());
int count = 0;
MetadataGatherer mg = new MetadataGatherer(songs);
for (String album : mg.getAlbums().keySet())
{
List<Song> songsInGrouping = mg.getAlbums().get(album);
count+=songsInGrouping.size();
MainWindow.logger.warning("Prematcher:" + songGroup.getKey() + ":" + album + ":count:" + songsInGrouping.size());
SongGroup sg = SongGroup.createSongGroupForSongs(songGroup, songsInGrouping);
sg.setRandomFolderNoMetadata(false);
sg.setRandomFolder(false);
processRandomFolder(sg, songsInGrouping);
}
List<Song> songsWithNoInfo = new ArrayList<>(mg.getSongsWithNoRelease());
if(songsWithNoInfo.size()>0)
{
count+=songsWithNoInfo.size();
SongGroup sgWithNoInfo = SongGroup.createSongGroupForSongs(songGroup, songsWithNoInfo);
MainWindow.logger.warning("Prematcher:" + songGroup.getKey() + ":NoMetadata:" + ":count:" + songsWithNoInfo.size());
processRandomFolderNoMetadata(sgWithNoInfo, songsWithNoInfo);
}
if(count<songs.size())
{
MainWindow.logger.warning(songGroup.getKey()+":Not all songs have been processed"+songs.size());
Errors.addErrorWithoutStackTrace(songGroup.getKey()+":Not all songs have been processed:"+songs.size());
}
return true;
}
private boolean processRandomFolder(SongGroup songGroup, List<Song> songs)
{
if(UserPreferences.getInstance().isSearchMusicBrainz())
{
MusicBrainzMetadataMatcher.getService().submit(new MusicBrainzMetadataMatcher(songGroup));
}
else if(UserPreferences.getInstance().isSearchDiscogs())
{
if(songGroup.getSubSongGroups().size() > 1)
{
DiscogsMultiFolderSongGroupMatcher.getService().submit(new DiscogsMultiFolderSongGroupMatcher(songGroup));
}
else if(songGroup.getSongIds().size()==1)
{
DiscogsSongMatcher.getService().submit(new DiscogsSongMatcher(songGroup, songGroup.getSongIds().get(0)));
}
else
{
DiscogsSongGroupMatcher.getService().submit(new DiscogsSongGroupMatcher(songGroup));
}
}
else
{
for (Integer songId : songGroup.getSongIds())
{
SongSaver.getService().submit(new SongSaver(songId));
}
}
return true;
}
private boolean processRandomFolderNoMetadata(SongGroup songGroup, List<Song> songs)
{
if(UserPreferences.getInstance().isSearchMusicBrainz())
{
for (Song song : songs)
{
MusicBrainzSongMatcher.getService().submit(new MusicBrainzSongMatcher(songGroup, song.getRecNo()));
}
}
else if(UserPreferences.getInstance().isSearchDiscogs())
{
for (Song song : songs)
{
DiscogsSongMatcher.getService().submit(new DiscogsSongMatcher(songGroup, song.getRecNo()));
}
}
else
{
for (Integer songId : songGroup.getSongIds())
{
SongSaver.getService().submit(new SongSaver(songId));
}
}
return true;
}
}