How to properly wait for the Apache launcher to start when launched from another application?

I am trying to avoid the "while (true)" solution when I wait until my apache spark plugs work, but without success.

I have a spark application that expects to process some data and put the result in a database, I call it from my spring service and want to wait until the work is done.

Example:

Launcher with method:

@Override public void run(UUID docId, String query) throws Exception { launcher.addAppArgs(docId.toString(), query); SparkAppHandle sparkAppHandle = launcher.startApplication(); sparkAppHandle.addListener(new SparkAppHandle.Listener() { @Override public void stateChanged(SparkAppHandle handle) { System.out.println(handle.getState() + " new state"); } @Override public void infoChanged(SparkAppHandle handle) { System.out.println(handle.getState() + " new state"); } }); System.out.println(sparkAppHandle.getState().toString()); } 

How to wait until the state of the handler is completed.

+5
source share
2 answers

I also use SparkLauncher from the Spring application. The following is a brief overview of the approach I took (following the examples in JavaDoc).

The @Service service used to run the job also implements SparkHandle.Listener and passes the link to itself through .startApplication, for example.

 ... ... @Service public class JobLauncher implements SparkAppHandle.Listener { ... ... ... private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception { String appResource = getAppResourceName(); SparkAppHandle handle = new SparkLauncher() .setAppResource(appResource).addAppArgs(args) .setMainClass(mainClass) .setMaster(sparkMaster) .setDeployMode(sparkDeployMode) .setSparkHome(sparkHome) .setConf(SparkLauncher.DRIVER_MEMORY, "2g") .startApplication(this); LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]"); return handle; } /** * Callback method for changes to the Spark Job */ @Override public void infoChanged(SparkAppHandle handle) { LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed. State [" + handle.getState() + "]"); } /** * Callback method for changes to the Spark Job state */ @Override public void stateChanged(SparkAppHandle handle) { LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]"); } 

Using this approach, one can act when the state changes to “FAILED”, “FINISHED” or “KILLED”.

I hope this information helps you.

+2
source

I implemented using CountDownLatch and it works as expected.

  ... final CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); Thread sparkAppListenerThread = new Thread(sparkAppListener); sparkAppListenerThread.start(); long timeout = 120; countDownLatch.await(timeout, TimeUnit.SECONDS); ... private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { private static final Log log = LogFactory.getLog(SparkAppListener.class); private final CountDownLatch countDownLatch; public SparkAppListener(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void stateChanged(SparkAppHandle handle) { String sparkAppId = handle.getAppId(); State appState = handle.getState(); if (sparkAppId != null) { log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); } else { log.info("Spark job state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); } if (appState != null && appState.isFinal()) { countDownLatch.countDown(); } } @Override public void infoChanged(SparkAppHandle handle) {} @Override public void run() {} } 
+1
source

Source: https://habr.com/ru/post/1245914/


All Articles