I would like to calculate the average value for each of the channels of the RGB set of images in multithreaded mode.
My idea was to have string_input_producerone that fills filename_queueand then has a second FIFOQueuethat fills with threads num_threadsthat load images from file names into filename_queue, perform some operations with them, and then complete the result.
This second queue then gains access to one thread (the main thread), which sums all the values from the queue.
This is the code I have:
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)
sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)
init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
mean, total = sess.run([mean_op, total_op])
except tf.errors.OutOfRangeError:
print 'All images processed.'
finally:
coord.request_stop()
coord.join(threads)
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean
The problem is that every time I run this function, I get different results, for example:
[ 99.35347748 58.35261154 44.56705856]
[ 95.91153717 92.54192352 87.48269653]
[ 124.991745 121.83417511 121.1891861 ]
I blame it in the race. But where do these race conditions come from? Can someone help me?