Keras Tensorflow - exception when forecasting from multiple threads

I am using keras 2.0.8 with tensorflow 1.3.0 backend.

I load the model into the init class and then use it to predict multithreading.

import tensorflow as tf from keras import backend as K from keras.models import load_model class CNN: def __init__(self, model_path): self.cnn_model = load_model(model_path) self.session = K.get_session() self.graph = tf.get_default_graph() def query_cnn(self, data): X = self.preproccesing(data) with self.session.as_default(): with self.graph.as_default(): return self.cnn_model.predict(X) 

I initialize CNN once, and the query_cnn method comes from multiple threads.

The exception that I get in my log is:

  File "/home/*/Similarity/CNN.py", line 43, in query_cnn return self.cnn_model.predict(X) File "/usr/local/lib/python3.5/dist-packages/keras/models.py", line 913, in predict return self.model.predict(x, batch_size=batch_size, verbose=verbose) File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1713, in predict verbose=verbose, steps=steps) File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1269, in _predict_loop batch_outs = f(ins_batch) File "/usr/local/lib/python3.5/dist-packages/keras/backend/tensorflow_backend.py", line 2273, in __call__ **self.session_kwargs) File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 895, in run run_metadata_ptr) File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1124, in _run feed_dict_tensor, options, run_metadata) File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1321, in _do_run options, run_metadata) File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1340, in _do_call raise type(e)(node_def, op, message) tensorflow.python.framework.errors_impl.NotFoundError: PruneForTargets: Some target nodes not found: group_deps 

The code works fine in most cases, probably a problem with a multi-threaded process.

How can i fix this?

+2
source share
1 answer

Before creating other threads, make sure that you have finished creating the chart.

Calling finalize() on a chart can help you with this.

 def __init__(self, model_path): self.cnn_model = load_model(model_path) self.session = K.get_session() self.graph = tf.get_default_graph() self.graph.finalize() 

Update 1: finalize() will make your graph read-only so that it can be safely used across multiple threads. As a side effect, this will help you detect inadvertent behavior, and sometimes memory leaks, as it will throw an exception when you try to change the schedule.

Imagine that you have a thread that does, for example, one hot-coding of your inputs. (wrong example :)

 def preprocessing(self, data): one_hot_data = tf.one_hot(data, depth=self.num_classes) return self.session.run(one_hot_data) 

If you print the number of objects on the chart, you will notice that it will increase over time

 # amount of nodes in tf graph print(len(list(tf.get_default_graph().as_graph_def().node))) 

But if you first define a graph that will not matter (a little better than the code):

 def preprocessing(self, data): # run pre-created operation with self.input as placeholder return self.session.run(self.one_hot_data, feed_dict={self.input: data}) 

Update 2: According to this thread, you need to call model._make_predict_function() on the keras model before doing multithreading.

Keras creates the GPU function the first time the prediction function () is called. What By the way, if you never cause a forecast, you will save time and resources. However, the first time you make a forecast, it is slightly slower than each other at a different time.

Updated code:

 def __init__(self, model_path): self.cnn_model = load_model(model_path) self.cnn_model._make_predict_function() # have to initialize before threading self.session = K.get_session() self.graph = tf.get_default_graph() self.graph.finalize() # make graph read-only 

Update 3: I proved the concept of warm-up because _make_predict_function() does not seem to work properly. First I created a dummy model:

 import tensorflow as tf from keras.layers import * from keras.models import * model = Sequential() model.add(Dense(256, input_shape=(2,))) model.add(Dense(1, activation='softmax')) model.compile(loss='mean_squared_error', optimizer='adam') model.save("dummymodel") 

Then in another script I loaded this model and ran it on multiple threads

 import tensorflow as tf from keras import backend as K from keras.models import load_model import threading as t import numpy as np K.clear_session() class CNN: def __init__(self, model_path): self.cnn_model = load_model(model_path) self.cnn_model.predict(np.array([[0,0]])) # warmup self.session = K.get_session() self.graph = tf.get_default_graph() self.graph.finalize() # finalize def preproccesing(self, data): # dummy return data def query_cnn(self, data): X = self.preproccesing(data) with self.session.as_default(): with self.graph.as_default(): prediction = self.cnn_model.predict(X) print(prediction) return prediction cnn = CNN("dummymodel") th = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))}) th2 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))}) th3 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))}) th4 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))}) th5 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))}) th.start() th2.start() th3.start() th4.start() th5.start() th2.join() th.join() th3.join() th5.join() th4.join() 

Commenting out the lines to warm up and finish, I was able to reproduce your first problem

+5
source

Source: https://habr.com/ru/post/1271424/


All Articles