Tensorflow: loading data into multiple threads on the processor

I have a python class SceneGeneratorthat has several member functions for preprocessing and a generator function generate_data(). The basic structure is as follows:

class SceneGenerator(object):
    def __init__(self):
       # some inits

    def generate_data(self):
        """
        Generator. Yield data X and labels y after some preprocessing
        """
        while True:
            # opening files, selecting data
            X,y = self.preprocess(some_params, filenames, ...)            

            yield X, y

I used the class member function sceneGenerator.generate_data () in the keras function model.fit_generator () to read data from disk, preproject it, and give. In keras, this is done for several CPU threads if the parameter workersparameter is model.fit_generator()set to <1>.

Now I want to use the same class SceneGeneratorin tensorflow. My current approach is this:

sceneGenerator = SceneGenerator(some_params)
for X, y in sceneGenerator.generate_data():

    feed_dict = {ops['data']: X,
                 ops['labels']: y,
                 ops['is_training_pl']: True
                 }
    summary, step, _, loss, prediction = sess.run([optimization_op, loss_op, pred_op],
                                                  feed_dict=feed_dict)

, , . tf.data.Dataset api , .

: , , .. . SceneGenerator hdf5. , - - . , , , . hdf5 csv , .

2:. , - : parallelising tf.data.Dataset.from_generator

+4
2

, Tensorflow (1.4 ), tf.data.* API ( , , ):

NUM_THREADS = 5
sceneGen = SceneGenerator()
dataset = tf.data.Dataset.from_generator(sceneGen.generate_data, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda x,y : (x,y), num_parallel_calls=NUM_THREADS).prefetch(buffer_size=1000)
dataset = dataset.batch(42)
X, y = dataset.make_one_shot_iterator().get_next()

, , , :

import threading    
class SceneGenerator(object):
  def __init__(self):
    # some inits
    pass

  def generate_data(self):
    """
    Generator. Yield data X and labels y after some preprocessing
    """
    while True:
      # opening files, selecting data
      X,y = threading.get_ident(), 2 #self.preprocess(some_params, filenames, ...)            
      yield X, y

, Tensorflow , . :

sess = tf.Session()
print(sess.run([X, y]))

[array([  8460.,   8460.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  16200.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  15912.,   8460.,   8460.,   6552.,  15912.,  15912.,
          8460.,   8460.,  15912.,   9956.,  16200.,   9956.,  16200.,
         15912.,  15912.,   9956.,  16200.,  15912.,  16200.,  16200.,
         16200.,   6552.,  16200.,  16200.,   9956.,   6552.,   6552.], dtype=float32),
 array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])]

. , map ( ) prefetch, ( , , , , , ).

+3

feed_dict :

Feed_dict memcpy Python TensorFlow.

tf.train.string_input_producer + *Reader + tf.train.Coordinator, . , , csv:

def batch_generator(filenames):
  filename_queue = tf.train.string_input_producer(filenames)
  reader = tf.TextLineReader(skip_header_lines=1)
  _, value = reader.read(filename_queue)

  content = tf.decode_csv(value, record_defaults=record_defaults)
  content[4] = tf.cond(tf.equal(content[4], tf.constant('Present')),
                       lambda: tf.constant(1.0),
                       lambda: tf.constant(0.0))

  features = tf.stack(content[:N_FEATURES])
  label = content[-1]

  data_batch, label_batch = tf.train.shuffle_batch([features, label],
                                                   batch_size=BATCH_SIZE,
                                                   capacity=20*BATCH_SIZE,
                                                   min_after_dequeue=10*BATCH_SIZE)
  return data_batch, label_batch

, , . , , , .

a tf.train.Coordinator, :

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for _ in range(10):  # generate 10 batches
        features, labels = sess.run([data_batch, label_batch])
        print(features)
    coord.request_stop()
    coord.join(threads)

, GPU. .

0

Source: https://habr.com/ru/post/1690051/


All Articles