Starting a local cluster under the nefefault class loader

Local Cluster from Web Class Loader

I am trying to start a local cluster from a web container (yes, this is only for dev and testing purposes) and I am having difficulty with class loaders.

Direct approach

When I make it easy and recommended ,

ILocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, stormConf, topology); 

I get a reward

 Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke(executor.clj:787) at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 

This is because the classloader used to load and instantiate StormTopology is an instance of Jetty WebAppClassLoader , but the (sub) process spawned by LocalCluster.submitTopology() seems to use the system classloader. I confirmed this by registering the class loader in the SomeFilteringBolt static block - the class is really loaded twice, and the bolt from WebAppCL, obviously, cannot be added to the system class loader bolt later.

Expected Behavior

Now this is surprising to me, because I thought that Storm would serialize an instance of StormTopology , “send” it locally, deserialize it, and run it. If so, then it would definitely work. Rather, it seems to directly use the provided instance of StormTopology , which is problematic under another classloader.

What i tried with

I tried setting them to true in order to get Storm to serialize my topology locally. Without changes.

I tried to run LocalCluster under the system class loader:

 ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); Config topologyConf = createTopologyConfig(); Map<String, Object> stormConf = createStormConfig(topologyConf); StormTopology topology = createTopology(topologyConf); ILocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, stormConf, topology); } finally { Thread.currentThread().setContextClassLoader(originalClassloader); } 

Actually this is me a little more:

 Thread died: java.lang.ExceptionInInitializerError at clojure.core__init.__init0(Unknown Source) at clojure.core__init.<clinit>(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at clojure.lang.RT.classForName(RT.java:2154) at clojure.lang.RT.classForName(RT.java:2163) at clojure.lang.RT.loadClassForName(RT.java:2182) at clojure.lang.RT.load(RT.java:436) at clojure.lang.RT.load(RT.java:412) at clojure.lang.RT.doInit(RT.java:454) at clojure.lang.RT.<clinit>(RT.java:330) at clojure.lang.Namespace.<init>(Namespace.java:34) at clojure.lang.Namespace.findOrCreate(Namespace.java:176) at clojure.lang.Var.internPrivate(Var.java:151) at org.apache.storm.LocalCluster.<clinit>(Unknown Source) at my.company.storm.LocalTopologyRunner.startTopology(LocalTopologyRunner.java:146) ... 10 more Caused by: java.lang.IllegalStateException: Attempting to call unbound fn: #'clojure.core/refer at clojure.lang.Var$Unbound.throwArity(Var.java:43) at clojure.lang.AFn.invoke(AFn.java:32) at clojure.lang.Var.invoke(Var.java:379) at clojure.lang.RT.doInit(RT.java:467) at clojure.lang.RT.<clinit>(RT.java:330) ... 18 more 

Wat ?!

Question

How can I safely run the Storm topology in local mode from a class loader other than the system class loader?

I am running Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.

+5
source share
2 answers

Apache Storm 1.0.3 magically fixed this.

Even without TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE , although there are no fixes in the release notes, so I could not track it until the code changed. In any case, we are happy that it works as expected.

+1
source

Not a storm expert, but it reminds me of the old “identity crisis” problem that I had in the past.

Two attempts:

  • Prioritize the System class loader by calling org.eclipse.jetty.webapp.WebAppContext.setParentLoaderPriority(true)

  • If this does not work, you can call the org.eclipse.jetty.webapp.WebAppContext.setSystemClasses or org.eclipse.jetty.webapp.WebAppContext.addSystemClass methods to control which classes are considered system classes within the webapp domain.

Do this for the entire storm package (it allows you to use wildcards like "org.apache.storm.") Before loading them (during webapp initialization).

Worth it! good luck.

+1
source

Source: https://habr.com/ru/post/1263059/


All Articles