Local Cluster from Web Class Loader
I am trying to start a local cluster from a web container (yes, this is only for dev and testing purposes) and I am having difficulty with class loaders.
Direct approach
When I make it easy and recommended ,
ILocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, stormConf, topology);
I get a reward
Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke(executor.clj:787) at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
This is because the classloader used to load and instantiate StormTopology is an instance of Jetty WebAppClassLoader , but the (sub) process spawned by LocalCluster.submitTopology() seems to use the system classloader. I confirmed this by registering the class loader in the SomeFilteringBolt static block - the class is really loaded twice, and the bolt from WebAppCL, obviously, cannot be added to the system class loader bolt later.
Expected Behavior
Now this is surprising to me, because I thought that Storm would serialize an instance of StormTopology , “send” it locally, deserialize it, and run it. If so, then it would definitely work. Rather, it seems to directly use the provided instance of StormTopology , which is problematic under another classloader.
What i tried with
I tried setting them to true in order to get Storm to serialize my topology locally. Without changes.
I tried to run LocalCluster under the system class loader:
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); Config topologyConf = createTopologyConfig(); Map<String, Object> stormConf = createStormConfig(topologyConf); StormTopology topology = createTopology(topologyConf); ILocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, stormConf, topology); } finally { Thread.currentThread().setContextClassLoader(originalClassloader); }
Actually this is me a little more:
Thread died: java.lang.ExceptionInInitializerError at clojure.core__init.__init0(Unknown Source) at clojure.core__init.<clinit>(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at clojure.lang.RT.classForName(RT.java:2154) at clojure.lang.RT.classForName(RT.java:2163) at clojure.lang.RT.loadClassForName(RT.java:2182) at clojure.lang.RT.load(RT.java:436) at clojure.lang.RT.load(RT.java:412) at clojure.lang.RT.doInit(RT.java:454) at clojure.lang.RT.<clinit>(RT.java:330) at clojure.lang.Namespace.<init>(Namespace.java:34) at clojure.lang.Namespace.findOrCreate(Namespace.java:176) at clojure.lang.Var.internPrivate(Var.java:151) at org.apache.storm.LocalCluster.<clinit>(Unknown Source) at my.company.storm.LocalTopologyRunner.startTopology(LocalTopologyRunner.java:146) ... 10 more Caused by: java.lang.IllegalStateException: Attempting to call unbound fn: #'clojure.core/refer at clojure.lang.Var$Unbound.throwArity(Var.java:43) at clojure.lang.AFn.invoke(AFn.java:32) at clojure.lang.Var.invoke(Var.java:379) at clojure.lang.RT.doInit(RT.java:467) at clojure.lang.RT.<clinit>(RT.java:330) ... 18 more
Wat ?!
Question
How can I safely run the Storm topology in local mode from a class loader other than the system class loader?
I am running Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.