Save Keras Breakpoints in Google Cloud Bucket

I am working on learning the LSTM network on the Google Cloud Machine Learning engine using Keras with the TensorFlow backend. I managed to deploy my model and complete a successful training task after making some changes to gcloud and my python script.

Then I tried to get my model to maintain breakpoints after each era using the Keras modelCheckpoint callback . Starting a local tutorial with Google Cloud works just fine as expected. Weights are stored in the specified path after each era. But when I try to run the same job online in the Google Cloud Machine Learning Engine, it weights.hdf5doesn’t write to my Google Cloud Bucket. Instead, I get the following error:

...
File "h5f.pyx", line 71, in h5py.h5f.open (h5py/h5f.c:1797)
IOError: Unable to open file (Unable to open file: name = 
'gs://.../weights.hdf5', errno = 2, error message = 'no such file or
directory', flags = 0, o_flags = 0)

I investigated this problem, and it turned out that there were no problems with the Bucket itself, since the Keras Tensorboard callback works fine and writes the expected result to the same segment. I also made sure that it was h5pyenabled by providing this at setup.pylocated at:

β”œβ”€β”€ setup.py
    └── trainer
    β”œβ”€β”€ __init__.py
    β”œβ”€β”€ ...

Actual inclusion in setup.pyshown below:

# setup.py
from setuptools import setup, find_packages

setup(name='kerasLSTM',
      version='0.1',
      packages=find_packages(),
      author='Kevin Katzke',
      install_requires=['keras','h5py','simplejson'],
      zip_safe=False)

I assume the problem boils down to the fact that GCS cannot be accessed with openPythons I / O, as it provides a custom implementation instead:

import tensorflow as tf
from tensorflow.python.lib.io import file_io

with file_io.FileIO("gs://...", 'r') as f:
    f.write("Hi!")

After checking how the Keras modelCheckpoint callback implements the actual file entry, it turns out that it uses h5py.File () for I / O:

 with h5py.File(filepath, mode='w') as f:
    f.attrs['keras_version'] = str(keras_version).encode('utf8')
    f.attrs['backend'] = K.backend().encode('utf8')
    f.attrs['model_config'] = json.dumps({
        'class_name': model.__class__.__name__,
        'config': model.get_config()
 }, default=get_json_type).encode('utf8')

h5py package Pythonic- HDF5 binary data format h5py.File() , HDF5 , : , .

, modelCheckpoint GCS Bucket? " " - hdf5, GCS file_io.FileIO()?

+10
6

:

# Save Keras ModelCheckpoints locally
model.save('model.h5')

# Copy model.h5 over to Google Cloud Storage
with file_io.FileIO('model.h5', mode='r') as input_f:
    with file_io.FileIO('model.h5', mode='w+') as output_f:
        output_f.write(input_f.read())
        print("Saved model.h5 to GCS")

Model.h5 GCS. Jochen, HDF5 GCS. , .

+4

, , , , GoogleML -.

  1. open(file_name, mode) Python open(file_name, mode) (gs://...../file_name). from tensorflow.python.lib.io import file_io open(file_name, mode) file_io.FileIO(file_name, mode=mode) ( mode). .
  2. / open(file_name, mode) . , - trained_model.save(file_path) . - , .

, , , :

model.save(file_path)

with file_io.FileIO(file_path, mode='rb') as if:
    with file_io.FileIO(os.path.join(model_dir, file_path), mode='wb+') as of:
        of.write(if.read())

, .

, , .

  1. , , , .

, train_model, :

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--job-dir',
        help='GCS location with read/write access',
        required=True
    )

    args = parser.parse_args()
    arguments = args.__dict__
    job_dir = arguments.pop('job_dir')

    with file_io.FileIO(os.path.join(job_dir, "test.txt"), mode='wb+') as of:
        of.write("Test passed.")

test.txt "Test passed." .

+12

, TF IO API. Keras GoogleCloudPlatform ML.

, GCS- ( "gs://" ) h5py , GCS TF file_io API. . : https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/keras/trainer/task.py#L146

+3

, . . .

UnicodeDecodeError: "utf-8" 0x89 0:

def copy_file_to_gcs(job_dir, file_path):
    with file_io.FileIO(file_path, mode='rb') as input_f:
        with file_io.FileIO(os.path.join(job_dir, file_path), mode='wb+') as output_f:
            output_f.write(input_f.read())
+2

, .

import os
import numpy as np
import warnings
from keras.callbacks import ModelCheckpoint

class ModelCheckpointGC(ModelCheckpoint):
"""Taken from and modified:
https://github.com/keras-team/keras/blob/tf-keras/keras/callbacks.py
"""

def on_epoch_end(self, epoch, logs=None):
    logs = logs or {}
    self.epochs_since_last_save += 1
    if self.epochs_since_last_save >= self.period:
        self.epochs_since_last_save = 0
        filepath = self.filepath.format(epoch=epoch, **logs)
        if self.save_best_only:
            current = logs.get(self.monitor)
            if current is None:
                warnings.warn('Can save best model only with %s available, '
                              'skipping.' % (self.monitor), RuntimeWarning)
            else:
                if self.monitor_op(current, self.best):
                    if self.verbose > 0:
                        print('Epoch %05d: %s improved from %0.5f to %0.5f,'
                              ' saving model to %s'
                              % (epoch, self.monitor, self.best,
                                 current, filepath))
                    self.best = current
                    if self.save_weights_only:
                        self.model.save_weights(filepath, overwrite=True)
                    else:
                        if is_development():
                            self.model.save(filepath, overwrite=True)
                        else:
                            self.model.save(filepath.split(
                                "/")[-1])
                            with file_io.FileIO(filepath.split(
                                    "/")[-1], mode='rb') as input_f:
                                with file_io.FileIO(filepath, mode='wb+') as output_f:
                                    output_f.write(input_f.read())
                else:
                    if self.verbose > 0:
                        print('Epoch %05d: %s did not improve' %
                              (epoch, self.monitor))
        else:
            if self.verbose > 0:
                print('Epoch %05d: saving model to %s' % (epoch, filepath))
            if self.save_weights_only:
                self.model.save_weights(filepath, overwrite=True)
            else:
                if is_development():
                    self.model.save(filepath, overwrite=True)
                else:
                    self.model.save(filepath.split(
                        "/")[-1])
                    with file_io.FileIO(filepath.split(
                            "/")[-1], mode='rb') as input_f:
                        with file_io.FileIO(filepath, mode='wb+') as output_f:
                            output_f.write(input_f.read())

is_development() , gcloud. LOCAL_ENV=1:

def is_development():
    """check if the environment is local or in the gcloud
    created the local variable in bash profile
    export LOCAL_ENV=1

    Returns:
        [boolean] -- True if local env
    """
    try:
        if os.environ['LOCAL_ENV'] == '1':
            return True
        else:
            return False
    except:
        return False

:

 ModelCheckpointGC(
            'gs://your_bucket/models/model.h5',
            monitor='loss',
            verbose=1,
            save_best_only=True,
            mode='min'))

, - .

+1

I'm not sure why this is not already mentioned, but there is a solution in which you do not need to add a copy function to your code.

Install gcsfuse by doing the following:

export GCSFUSE_REPO=gcsfuse-'lsb_release -c -s'
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse

Then install your bucket locally:

mkdir bucket
gcsfuse <cloud_bucket_name> bucket

and then use the local directory bucket/as the logdir of your model.

Cloud and local directory synchronization will be automated for you, and your code can stay clean.

Hope this helps :)

+1
source

Source: https://habr.com/ru/post/1683318/


All Articles