From b03159e77c79eb65d5f94709809a5e168e788138 Mon Sep 17 00:00:00 2001 From: You Quan Chong Date: Mon, 6 Dec 2021 13:37:32 -0800 Subject: [PATCH] Update TF Model Garden dependency PiperOrigin-RevId: 414528977 --- src/python/dependencies.py | 1 + .../core/experimental/models.py | 449 +++++++++--------- .../experimental/tests/unit/models_test.py | 434 ++++++++--------- 3 files changed, 443 insertions(+), 441 deletions(-) diff --git a/src/python/dependencies.py b/src/python/dependencies.py index 383abd1b..4068413c 100644 --- a/src/python/dependencies.py +++ b/src/python/dependencies.py @@ -23,6 +23,7 @@ def make_required_install_packages(): "google-auth", "google-cloud-storage", "keras-tuner", + "keras==2.6.*", "tensorboard>=2.3.0", "tensorflow>=1.15.0,<3.0", "tensorflow_datasets", diff --git a/src/python/tensorflow_cloud/core/experimental/models.py b/src/python/tensorflow_cloud/core/experimental/models.py index c7bae9c8..8219ba4a 100644 --- a/src/python/tensorflow_cloud/core/experimental/models.py +++ b/src/python/tensorflow_cloud/core/experimental/models.py @@ -24,15 +24,15 @@ import tensorflow as tf import tensorflow_datasets as tfds -from official.vision.image_classification.efficientnet import efficientnet_model -from official.vision.image_classification.resnet import resnet_model +from official.legacy.image_classification.efficientnet import efficientnet_model +from official.legacy.image_classification.resnet import resnet_model # pylint: disable=g-import-not-at-top try: - import importlib.resources as pkg_resources + import importlib.resources as pkg_resources except ImportError: - # Backported for python<3.7 - import importlib_resources as pkg_resources + # Backported for python<3.7 + import importlib_resources as pkg_resources # pylint: enable=g-import-not-at-top _PARAMS_FILE_NAME_FORMAT = '{}_params' @@ -50,274 +50,273 @@ def run_models(dataset_name: str, batch_size: Optional[int] = 128, job_name: Optional[str] = '', **run_kwargs) -> Optional[Dict[str, str]]: - """A wrapper for tfc.run that runs models from TF Model Garden on the Cloud. - - This method allows for running models from TF Model Garden using datasets - from TFDS directly on the Cloud. Currently it only supports image - classification models. Specifically ResNet and EfficientNet. - - Args: - dataset_name: the registered name of the `DatasetBuilder` (the snake - case version of the class name). This can be either `'dataset_name'` or - `'dataset_name/config_name'` for datasets with `BuilderConfig`s. - model_name: the name of the model. Currently it supports: - -'resnet': For the resnet50 model. - -'efficientnet': For the efficientnet-b0 model. - -Any efficientnet configuration present in - efficientnet_model.MODEL_CONFIGS. Use the key as it appears in the - dictionary. - gcs_bucket: The gcs bucket that is going to be used to build and store - the training. - train_split: Which split of the data to load for training. Available - options depend on the dataset and can be found on the TFDS docs. - validation_split: Which split of the data to load for validation. - Available options depend on the dataset and can be found on the TFDS - docs. If None is provided, then 0.2 of the training split will be used - for validation. - one_hot: If True it performs one hot encoding on the label, and it - assumes the label is the index. - epochs: The number of epochs that are going to be used for training. - batch_size: The batch size to use during training. - job_name: The name of the job in GCP. - **run_kwargs: keyword arguments for `tfc.run()`. - - Returns: - A dictionary with five keys. - 1. 'job_id': the training job id. - 2. 'docker_image': Docker image generated for the training job. - 3. 'tensorboard_logs': the path to the tensorboard logs registered - during callbacks. - 4. 'model_checkpoint': the path to the model checkpoints registered - during callbacks. - 5. 'saved_model': the path to the saved model. - """ - model_dirs = get_model_dirs(gcs_bucket, job_name) - - if run.remote(): - classifier_trainer(dataset_name, model_name, batch_size, epochs, - train_split, validation_split, one_hot, model_dirs) - return - - validate_input() - if 'job_labels' in run_kwargs and job_name: - run_kwargs['job_labels']['job'] = job_name - elif job_name: - run_kwargs['job_labels'] = {'job': job_name} - - run_results = run.run(**run_kwargs) - run_results.update(model_dirs) - return run_results + """A wrapper for tfc.run that runs models from TF Model Garden on the Cloud. + + This method allows for running models from TF Model Garden using datasets + from TFDS directly on the Cloud. Currently it only supports image + classification models. Specifically ResNet and EfficientNet. + + Args: + dataset_name: the registered name of the `DatasetBuilder` (the snake case + version of the class name). This can be either `'dataset_name'` or + `'dataset_name/config_name'` for datasets with `BuilderConfig`s. + model_name: the name of the model. Currently it supports: + -'resnet': For the resnet50 model. + -'efficientnet': For the efficientnet-b0 model. -Any efficientnet + configuration present in efficientnet_model.MODEL_CONFIGS. Use the key + as it appears in the dictionary. + gcs_bucket: The gcs bucket that is going to be used to build and store the + training. + train_split: Which split of the data to load for training. Available options + depend on the dataset and can be found on the TFDS docs. + validation_split: Which split of the data to load for validation. Available + options depend on the dataset and can be found on the TFDS docs. If None + is provided, then 0.2 of the training split will be used for validation. + one_hot: If True it performs one hot encoding on the label, and it assumes + the label is the index. + epochs: The number of epochs that are going to be used for training. + batch_size: The batch size to use during training. + job_name: The name of the job in GCP. + **run_kwargs: keyword arguments for `tfc.run()`. + + Returns: + A dictionary with five keys. + 1. 'job_id': the training job id. + 2. 'docker_image': Docker image generated for the training job. + 3. 'tensorboard_logs': the path to the tensorboard logs registered + during callbacks. + 4. 'model_checkpoint': the path to the model checkpoints registered + during callbacks. + 5. 'saved_model': the path to the saved model. + """ + model_dirs = get_model_dirs(gcs_bucket, job_name) + + if run.remote(): + classifier_trainer(dataset_name, model_name, batch_size, epochs, + train_split, validation_split, one_hot, model_dirs) + return + + validate_input() + if 'job_labels' in run_kwargs and job_name: + run_kwargs['job_labels']['job'] = job_name + elif job_name: + run_kwargs['job_labels'] = {'job': job_name} + + run_results = run.run(**run_kwargs) + run_results.update(model_dirs) + return run_results def get_model_dirs(gcs_bucket, job_name): - gcs_base_path = f'gs://{gcs_bucket}/{job_name}' - return {'tensorboard_logs': os.path.join(gcs_base_path, 'logs'), - 'model_checkpoint': - os.path.join(gcs_base_path, 'checkpoints'), - 'saved_model': os.path.join(gcs_base_path, 'saved_model')} + gcs_base_path = f'gs://{gcs_bucket}/{job_name}' + return { + 'tensorboard_logs': os.path.join(gcs_base_path, 'logs'), + 'model_checkpoint': os.path.join(gcs_base_path, 'checkpoints'), + 'saved_model': os.path.join(gcs_base_path, 'saved_model') + } # TODO(uribejuan): Write function to make sure the input is valid def validate_input(): - pass + pass def classifier_trainer(dataset_name, model_name, batch_size, epochs, train_split, validation_split, one_hot, model_dirs): - """Training loop for image classifier from TF model Garden using TFDS.""" - builder = tfds.builder(dataset_name) - - num_classes = builder.info.features['label'].num_classes - model = get_model(model_name, batch_size, num_classes) - - if model_name == 'resnet': - image_size = 224 - width_ratio = 1 - else: # Assumes model_name is an efficientnet version - image_size = model.config.resolution - width_ratio = model.config.width_coefficient - - train_ds, validation_ds = load_data_from_builder(builder, train_split, - validation_split, - image_size, width_ratio, - batch_size, one_hot, - num_classes) - callbacks = [ - tf.keras.callbacks.TensorBoard(log_dir=model_dirs['tensorboard_logs']), - tf.keras.callbacks.ModelCheckpoint( - model_dirs['model_checkpoint'], save_best_only=True), - tf.keras.callbacks.EarlyStopping( - monitor='loss', min_delta=0.001, patience=3) - ] - - model.compile( - optimizer=tf.keras.optimizers.Adam(), - loss=tf.keras.losses.CategoricalCrossentropy(), - metrics=[tf.keras.metrics.CategoricalAccuracy(dtype=tf.float32)], - ) - - model.fit( - train_ds, - validation_data=validation_ds, - epochs=epochs, - callbacks=callbacks) - - model.save(model_dirs['saved_model']) + """Training loop for image classifier from TF model Garden using TFDS.""" + builder = tfds.builder(dataset_name) + + num_classes = builder.info.features['label'].num_classes + model = get_model(model_name, batch_size, num_classes) + + if model_name == 'resnet': + image_size = 224 + width_ratio = 1 + else: # Assumes model_name is an efficientnet version + image_size = model.config.resolution + width_ratio = model.config.width_coefficient + + train_ds, validation_ds = load_data_from_builder(builder, train_split, + validation_split, image_size, + width_ratio, batch_size, + one_hot, num_classes) + callbacks = [ + tf.keras.callbacks.TensorBoard(log_dir=model_dirs['tensorboard_logs']), + tf.keras.callbacks.ModelCheckpoint( + model_dirs['model_checkpoint'], save_best_only=True), + tf.keras.callbacks.EarlyStopping( + monitor='loss', min_delta=0.001, patience=3) + ] + + model.compile( + optimizer=tf.keras.optimizers.Adam(), + loss=tf.keras.losses.CategoricalCrossentropy(), + metrics=[tf.keras.metrics.CategoricalAccuracy(dtype=tf.float32)], + ) + + model.fit( + train_ds, + validation_data=validation_ds, + epochs=epochs, + callbacks=callbacks) + + model.save(model_dirs['saved_model']) def load_data_from_builder(builder, train_split, validation_split, image_size, width_ratio, batch_size, one_hot, num_classes): - """Loads the train and validation dataset from a dataset builder.""" - builder.download_and_prepare() - - num_examples = builder.info.splits[train_split].num_examples - train_ds = builder.as_dataset( - train_split, shuffle_files=True, as_supervised=True) - train_ds = data_pipeline(train_ds, image_size, width_ratio, batch_size, - num_classes, one_hot, - num_examples) - - if validation_split is not None: - validation_ds = builder.as_dataset( - validation_split, shuffle_files=True, as_supervised=True) - validation_ds = data_pipeline(validation_ds, image_size, width_ratio, - batch_size, num_classes, one_hot, - num_examples) - else: - validation_ds = None + """Loads the train and validation dataset from a dataset builder.""" + builder.download_and_prepare() + + num_examples = builder.info.splits[train_split].num_examples + train_ds = builder.as_dataset( + train_split, shuffle_files=True, as_supervised=True) + train_ds = data_pipeline(train_ds, image_size, width_ratio, batch_size, + num_classes, one_hot, num_examples) + + if validation_split is not None: + validation_ds = builder.as_dataset( + validation_split, shuffle_files=True, as_supervised=True) + validation_ds = data_pipeline(validation_ds, image_size, width_ratio, + batch_size, num_classes, one_hot, + num_examples) + else: + validation_ds = None - return train_ds, validation_ds + return train_ds, validation_ds def get_model(model_name, batch_size, num_classes): - """Gets model_name from TF Model Garden.""" - if model_name == 'resnet': - return load_resnet(batch_size, num_classes) - elif model_name == 'efficientnet': - return load_efficientnet(num_classes) - elif model_name in efficientnet_model.MODEL_CONFIGS: - return load_efficientnet(num_classes, model_name) - raise TypeError(f'Unknown model_name argument: {model_name}') + """Gets model_name from TF Model Garden.""" + if model_name == 'resnet': + return load_resnet(batch_size, num_classes) + elif model_name == 'efficientnet': + return load_efficientnet(num_classes) + elif model_name in efficientnet_model.MODEL_CONFIGS: + return load_efficientnet(num_classes, model_name) + raise TypeError(f'Unknown model_name argument: {model_name}') def load_resnet(batch_size, num_classes): - """Loads the ResNet model from TF Model Garden.""" - return resnet_model.resnet50( - batch_size=batch_size, num_classes=num_classes) + """Loads the ResNet model from TF Model Garden.""" + return resnet_model.resnet50(batch_size=batch_size, num_classes=num_classes) def load_efficientnet(num_classes, model_name='efficientnet-b0'): - """Loads the EfficientNet from TF Model Garden.""" - overrides = { - 'num_classes': num_classes, - } - return efficientnet_model.EfficientNet.from_name( - model_name, overrides=overrides) + """Loads the EfficientNet from TF Model Garden.""" + overrides = { + 'num_classes': num_classes, + } + return efficientnet_model.EfficientNet.from_name( + model_name, overrides=overrides) def normalize_img_and_label(image, label, image_size, width_ratio=1, num_classes=None, one_hot=False): - """Normalizes the image and label according to the params specified.""" - if one_hot: - label = tf.one_hot(label, num_classes, dtype=tf.dtypes.int8) - image = tf.image.resize_with_pad(image, image_size, - int(image_size * width_ratio)) - return image, label + """Normalizes the image and label according to the params specified.""" + if one_hot: + label = tf.one_hot(label, num_classes, dtype=tf.dtypes.int8) + image = tf.image.resize_with_pad(image, image_size, + int(image_size * width_ratio)) + return image, label def data_pipeline(original_ds, image_size, width_ratio, batch_size, num_classes, one_hot, num_examples): - """Pipeline for pre-processing the data.""" - norm_args = {'image_size': image_size, 'width_ratio': width_ratio, - 'num_classes': num_classes, 'one_hot': one_hot} - ds = original_ds.map( - lambda image, label: normalize_img_and_label(image, label, **norm_args), - num_parallel_calls=tf.data.experimental.AUTOTUNE) - ds = ds.cache() - ds = ds.shuffle(min(num_examples, 1000)) - ds = ds.batch(batch_size, drop_remainder=True) - ds = ds.prefetch(tf.data.experimental.AUTOTUNE) - return ds + """Pipeline for pre-processing the data.""" + norm_args = { + 'image_size': image_size, + 'width_ratio': width_ratio, + 'num_classes': num_classes, + 'one_hot': one_hot + } + ds = original_ds.map( + lambda image, label: normalize_img_and_label(image, label, **norm_args), + num_parallel_calls=tf.data.experimental.AUTOTUNE) + ds = ds.cache() + ds = ds.shuffle(min(num_examples, 1000)) + ds = ds.batch(batch_size, drop_remainder=True) + ds = ds.prefetch(tf.data.experimental.AUTOTUNE) + return ds def run_experiment_cloud(run_experiment_kwargs: Dict[str, Any], run_kwargs: Optional[Dict[str, Any]] = None, ) -> Optional[Dict[str, str]]: - """A wrapper for run API and tf-models-official run_experiment. - - This method takes a dictionary of the parameters for run and a dictionary - of the parameters for run_experiment to run the experiment directly on GCP. - - Args: - run_experiment_kwargs: keyword arguments for `train_lib.run_experiment`. - The docs can be found at - https://github.com/tensorflow/models/blob/master/official/core/train_lib.py - The distribution_strategy param is ignored because the distribution - strategy is selected based on run_kwargs. - run_kwargs: keyword arguments for `tfc.run`. The docs can be found at - https://github.com/tensorflow/cloud/blob/master/src/python/tensorflow_cloud/core/run.py - The params entry_point and distribution_strategy are ignored. - Returns: - A dictionary with two keys. - 1. 'job_id': the training job id. - 2. 'docker_image': Docker image generated for the training job. - """ - if run_kwargs is None: - run_kwargs = dict() - distribution_strategy = get_distribution_strategy_str(run_kwargs) - run_experiment_kwargs.update( - dict(distribution_strategy=distribution_strategy)) - file_id = str(uuid.uuid4()) - params_file = save_params(run_experiment_kwargs, file_id) - entry_point = copy_entry_point(file_id, params_file) - - run_kwargs.update(dict(entry_point=entry_point, - distribution_strategy=None)) - info = run.run(**run_kwargs) - os.remove(entry_point) - os.remove(params_file) - return info + """A wrapper for run API and tf-models-official run_experiment. + + This method takes a dictionary of the parameters for run and a dictionary + of the parameters for run_experiment to run the experiment directly on GCP. + + Args: + run_experiment_kwargs: keyword arguments for `train_lib.run_experiment`. The + docs can be found at + https://github.com/tensorflow/models/blob/master/official/core/train_lib.py + The distribution_strategy param is ignored because the distribution + strategy is selected based on run_kwargs. + run_kwargs: keyword arguments for `tfc.run`. The docs can be found at + https://github.com/tensorflow/cloud/blob/master/src/python/tensorflow_cloud/core/run.py + The params entry_point and distribution_strategy are ignored. + + Returns: + A dictionary with two keys. + 1. 'job_id': the training job id. + 2. 'docker_image': Docker image generated for the training job. + """ + if run_kwargs is None: + run_kwargs = dict() + distribution_strategy = get_distribution_strategy_str(run_kwargs) + run_experiment_kwargs.update( + dict(distribution_strategy=distribution_strategy)) + file_id = str(uuid.uuid4()) + params_file = save_params(run_experiment_kwargs, file_id) + entry_point = copy_entry_point(file_id, params_file) + + run_kwargs.update(dict(entry_point=entry_point, distribution_strategy=None)) + info = run.run(**run_kwargs) + os.remove(entry_point) + os.remove(params_file) + return info def copy_entry_point(file_id, params_file): - """Copy models_entry_point and add params file name.""" - lines = get_original_lines() - entry_point = _ENTRY_POINT_FORMAT.format(file_id) - with open(entry_point, 'w') as entry_file: - for line in lines: - if line.startswith('PARAMS_FILE_NAME = '): - entry_file.write(f"PARAMS_FILE_NAME = '{params_file}'\n") - else: - entry_file.write(line) - return entry_point + """Copy models_entry_point and add params file name.""" + lines = get_original_lines() + entry_point = _ENTRY_POINT_FORMAT.format(file_id) + with open(entry_point, 'w') as entry_file: + for line in lines: + if line.startswith('PARAMS_FILE_NAME = '): + entry_file.write(f"PARAMS_FILE_NAME = '{params_file}'\n") + else: + entry_file.write(line) + return entry_point def get_original_lines(): - """Gets the file lines of models_entry_point.py as a list of strings.""" - with pkg_resources.open_text(__package__, _ENTRY_POINT_TEMPLATE) as file: - lines = file.readlines() - return lines + """Gets the file lines of models_entry_point.py as a list of strings.""" + with pkg_resources.open_text(__package__, _ENTRY_POINT_TEMPLATE) as file: + lines = file.readlines() + return lines def get_distribution_strategy_str(run_kwargs): - """Gets the name of a distribution strategy based on cloud run config.""" - if ('worker_count' in run_kwargs - and run_kwargs['worker_count'] > 0): - if ('worker_config' in run_kwargs - and machine_config.is_tpu_config(run_kwargs['worker_config'])): - return 'tpu' - else: - return 'multi_mirror' - elif ('chief_config' in run_kwargs - and run_kwargs['chief_config'].accelerator_count > 1): - return 'mirror' + """Gets the name of a distribution strategy based on cloud run config.""" + if ('worker_count' in run_kwargs and run_kwargs['worker_count'] > 0): + if ('worker_config' in run_kwargs and + machine_config.is_tpu_config(run_kwargs['worker_config'])): + return 'tpu' else: - return 'one_device' + return 'multi_mirror' + elif ('chief_config' in run_kwargs and + run_kwargs['chief_config'].accelerator_count > 1): + return 'mirror' + else: + return 'one_device' def save_params(params, file_id): - """Pickles the params object using the file_id as prefix.""" - file_name = _PARAMS_FILE_NAME_FORMAT.format(file_id) - with open(file_name, 'xb') as f: - pickle.dump(params, f) - return file_name + """Pickles the params object using the file_id as prefix.""" + file_name = _PARAMS_FILE_NAME_FORMAT.format(file_id) + with open(file_name, 'xb') as f: + pickle.dump(params, f) + return file_name diff --git a/src/python/tensorflow_cloud/core/experimental/tests/unit/models_test.py b/src/python/tensorflow_cloud/core/experimental/tests/unit/models_test.py index 7c145805..fc281278 100644 --- a/src/python/tensorflow_cloud/core/experimental/tests/unit/models_test.py +++ b/src/python/tensorflow_cloud/core/experimental/tests/unit/models_test.py @@ -23,226 +23,228 @@ from tensorflow_cloud.core import machine_config from tensorflow_cloud.core import run from tensorflow_cloud.core.experimental import models -from official.vision.image_classification.efficientnet import efficientnet_model +from official.legacy.image_classification.efficientnet import efficientnet_model class ModelsTest(absltest.TestCase): - def setup_get_model(self): - self.batch_size = 64 - self.num_classes = 100 - - def setup_normalize_img_and_label(self): - self.small_img = tf.random.uniform( - shape=[100, 50, 3], maxval=255, dtype=tf.dtypes.int32) - self.big_img = tf.random.uniform( - shape=[300, 500, 3], maxval=255, dtype=tf.dtypes.int32) - self.image_size = 224 - self.width_ratio = 2 - self.expected_img_shape = [self.image_size, - self.image_size * self.width_ratio, - 3] - self.label = tf.convert_to_tensor(4) - - def setup_run(self, remote=True): - if remote: - self.run_return_value = None - else: - self.run_return_value = {'job_id': 'job_id', - 'docker_image': 'docker_image'} - self.run = mock.patch.object( - run, - 'run', - autospec=True, - return_value=self.run_return_value, - ).start() - - self.remote = mock.patch.object( - run, - 'remote', - autospec=True, - return_value=remote, - ).start() - - def setup_run_models(self): - self.classifier_trainer = mock.patch.object( - models, - 'classifier_trainer', - autospec=True, - ).start() - - def setup_run_experiment_cloud(self, file_id): - self.params_file = models._PARAMS_FILE_NAME_FORMAT.format( - file_id) - - self.save_params = mock.patch.object( - models, - 'save_params', - autospec=True, - return_value=self.params_file, - ).start() - - self.copy_entry_point = mock.patch.object( - models, - 'copy_entry_point', - autospec=True, - return_value=models._ENTRY_POINT_FORMAT.format(file_id), - ).start() - - self.remove = mock.patch.object( - os, - 'remove', - autospec=True, - ).start() - - self.uuid4 = mock.patch.object( - uuid, - 'uuid4', - return_value=file_id, - ).start() - - def tearDown(self): - mock.patch.stopall() - super(ModelsTest, self).tearDown() - - def test_get_model_resnet(self): - self.setup_get_model() - resnet = models.get_model('resnet', self.batch_size, self.num_classes) - self.assertEqual('resnet50', resnet._name) - self.assertEqual(self.batch_size, resnet.outputs[0].shape[0]) - self.assertEqual(self.num_classes, resnet.outputs[0].shape[1]) - - def test_get_model_efficientnet_default(self): - self.setup_get_model() - efficientnet = models.get_model('efficientnet', self.batch_size, - self.num_classes) - default_config = efficientnet_model.MODEL_CONFIGS['efficientnet-b0'] - default_config.num_classes = self.num_classes - self.assertEqual(efficientnet.config, default_config) - - def test_get_model_efficientnet(self): - self.setup_get_model() - efficientnet_version = 'efficientnet-b2' - efficientnet = models.get_model(efficientnet_version, self.batch_size, - self.num_classes) - default_config = efficientnet_model.MODEL_CONFIGS[efficientnet_version] - default_config.num_classes = self.num_classes - self.assertEqual(efficientnet.config, default_config) - - def test_get_model_error(self): - self.setup_get_model() - self.assertRaises(TypeError, models.get_model, 'not_a_model', - self.batch_size, self.num_classes) - - def test_normalize_image_and_label_without_one_hot(self): - self.setup_normalize_img_and_label() - expected_label = self.label - result_img, result_label = models.normalize_img_and_label( - self.small_img, self.label, self.image_size, self.width_ratio) - self.assertEqual(result_img.shape, self.expected_img_shape) - self.assertEqual(result_label, expected_label) - - def test_normalize_image_and_label_with_one_hot(self): - self.setup_normalize_img_and_label() - num_classes = 10 - expected_label = tf.convert_to_tensor([0, 0, 0, 0, 1, 0, 0, 0, 0, 0], - dtype=tf.dtypes.int8) - result_img, result_label = models.normalize_img_and_label( - self.big_img, self.label, self.image_size, self.width_ratio, - num_classes, True) - self.assertEqual(result_img.shape, self.expected_img_shape) - self.assertTrue((result_label == expected_label).numpy().all()) - - def test_run_models_locally(self): - self.setup_run(remote=False) - self.setup_run_models() - run_kwargs = {'entry_point': 'entry_point', - 'requirements_txt': 'requirements_txt', - 'worker_count': 5,} - result = models.run_models('dataset_name', 'model_name', 'gcs_bucket', - 'train_split', 'validation_split', - **run_kwargs) - self.remote.assert_called() - self.run.assert_called_with(**run_kwargs) - self.classifier_trainer.assert_not_called() - return_keys = ['job_id', 'docker_image', 'tensorboard_logs', - 'model_checkpoint', 'saved_model'] - self.assertListEqual(list(result.keys()), return_keys) - - def test_run_models_remote(self): - self.setup_run() - self.setup_run_models() - result = models.run_models('dataset_name', 'model_name', 'gcs_bucket', - 'train_split', 'validation_split') - self.remote.assert_called() - self.run.assert_not_called() - self.classifier_trainer.assert_called() - - self.assertIsNone(result) - - def test_run_experiment_cloud(self): - self.setup_run(remote=False) - file_id = 'test_id' - self.setup_run_experiment_cloud(file_id) - run_experiment_kwargs = dict() - models.run_experiment_cloud( - run_experiment_kwargs=run_experiment_kwargs) - entry_point = models._ENTRY_POINT_FORMAT.format(file_id) - self.run.assert_called_with(entry_point=entry_point, - distribution_strategy=None) - self.remove.assert_any_call(entry_point) - self.remove.assert_any_call(self.params_file) - - def setup_copy_entry_point(self): - self.get_original_lines = mock.patch.object( - models, - 'get_original_lines', - autospec=True, - return_value=['PARAMS_FILE_NAME = not this', 'do not change'], - ).start() - - self.open = mock.mock_open() - mock.patch( - 'builtins.open', - self.open, - ).start() - - def test_copy_entry_point(self): - self.setup_copy_entry_point() - file_id = 'file_id' - params_file = 'params_file' - models.copy_entry_point(file_id, params_file) - - self.open.assert_called_once_with( - models._ENTRY_POINT_FORMAT.format(file_id), - 'w') - entry_file = self.open() - entry_file.write.assert_any_call( - f"PARAMS_FILE_NAME = '{params_file}'\n") - entry_file.write.assert_any_call('do not change') - - def test_get_distribution_strategy_tpu(self): - run_kwargs = dict( - worker_count=1, - worker_config=machine_config.COMMON_MACHINE_CONFIGS['TPU'],) - strategy = models.get_distribution_strategy_str(run_kwargs) - self.assertEqual('tpu', strategy) - - def test_get_distribution_strategy_multi_mirror(self): - run_kwargs = dict(worker_count=1) - strategy = models.get_distribution_strategy_str(run_kwargs) - self.assertEqual('multi_mirror', strategy) - - def test_get_distribution_strategy_mirror(self): - run_kwargs = dict( - chief_config=machine_config.COMMON_MACHINE_CONFIGS['K80_4X']) - strategy = models.get_distribution_strategy_str(run_kwargs) - self.assertEqual('mirror', strategy) - - def test_get_distribution_strategy_one_device(self): - run_kwargs = dict() - strategy = models.get_distribution_strategy_str(run_kwargs) - self.assertEqual('one_device', strategy) + def setup_get_model(self): + self.batch_size = 64 + self.num_classes = 100 + + def setup_normalize_img_and_label(self): + self.small_img = tf.random.uniform( + shape=[100, 50, 3], maxval=255, dtype=tf.dtypes.int32) + self.big_img = tf.random.uniform( + shape=[300, 500, 3], maxval=255, dtype=tf.dtypes.int32) + self.image_size = 224 + self.width_ratio = 2 + self.expected_img_shape = [ + self.image_size, self.image_size * self.width_ratio, 3 + ] + self.label = tf.convert_to_tensor(4) + + def setup_run(self, remote=True): + if remote: + self.run_return_value = None + else: + self.run_return_value = { + 'job_id': 'job_id', + 'docker_image': 'docker_image' + } + self.run = mock.patch.object( + run, + 'run', + autospec=True, + return_value=self.run_return_value, + ).start() + + self.remote = mock.patch.object( + run, + 'remote', + autospec=True, + return_value=remote, + ).start() + + def setup_run_models(self): + self.classifier_trainer = mock.patch.object( + models, + 'classifier_trainer', + autospec=True, + ).start() + + def setup_run_experiment_cloud(self, file_id): + self.params_file = models._PARAMS_FILE_NAME_FORMAT.format(file_id) + + self.save_params = mock.patch.object( + models, + 'save_params', + autospec=True, + return_value=self.params_file, + ).start() + + self.copy_entry_point = mock.patch.object( + models, + 'copy_entry_point', + autospec=True, + return_value=models._ENTRY_POINT_FORMAT.format(file_id), + ).start() + + self.remove = mock.patch.object( + os, + 'remove', + autospec=True, + ).start() + + self.uuid4 = mock.patch.object( + uuid, + 'uuid4', + return_value=file_id, + ).start() + + def tearDown(self): + mock.patch.stopall() + super(ModelsTest, self).tearDown() + + def test_get_model_resnet(self): + self.setup_get_model() + resnet = models.get_model('resnet', self.batch_size, self.num_classes) + self.assertEqual('resnet50', resnet._name) + self.assertEqual(self.batch_size, resnet.outputs[0].shape[0]) + self.assertEqual(self.num_classes, resnet.outputs[0].shape[1]) + + def test_get_model_efficientnet_default(self): + self.setup_get_model() + efficientnet = models.get_model('efficientnet', self.batch_size, + self.num_classes) + default_config = efficientnet_model.MODEL_CONFIGS['efficientnet-b0'] + default_config.num_classes = self.num_classes + self.assertEqual(efficientnet.config, default_config) + + def test_get_model_efficientnet(self): + self.setup_get_model() + efficientnet_version = 'efficientnet-b2' + efficientnet = models.get_model(efficientnet_version, self.batch_size, + self.num_classes) + default_config = efficientnet_model.MODEL_CONFIGS[efficientnet_version] + default_config.num_classes = self.num_classes + self.assertEqual(efficientnet.config, default_config) + + def test_get_model_error(self): + self.setup_get_model() + self.assertRaises(TypeError, models.get_model, 'not_a_model', + self.batch_size, self.num_classes) + + def test_normalize_image_and_label_without_one_hot(self): + self.setup_normalize_img_and_label() + expected_label = self.label + result_img, result_label = models.normalize_img_and_label( + self.small_img, self.label, self.image_size, self.width_ratio) + self.assertEqual(result_img.shape, self.expected_img_shape) + self.assertEqual(result_label, expected_label) + + def test_normalize_image_and_label_with_one_hot(self): + self.setup_normalize_img_and_label() + num_classes = 10 + expected_label = tf.convert_to_tensor([0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + dtype=tf.dtypes.int8) + result_img, result_label = models.normalize_img_and_label( + self.big_img, self.label, self.image_size, self.width_ratio, + num_classes, True) + self.assertEqual(result_img.shape, self.expected_img_shape) + self.assertTrue((result_label == expected_label).numpy().all()) + + def test_run_models_locally(self): + self.setup_run(remote=False) + self.setup_run_models() + run_kwargs = { + 'entry_point': 'entry_point', + 'requirements_txt': 'requirements_txt', + 'worker_count': 5, + } + result = models.run_models('dataset_name', 'model_name', 'gcs_bucket', + 'train_split', 'validation_split', **run_kwargs) + self.remote.assert_called() + self.run.assert_called_with(**run_kwargs) + self.classifier_trainer.assert_not_called() + return_keys = [ + 'job_id', 'docker_image', 'tensorboard_logs', 'model_checkpoint', + 'saved_model' + ] + self.assertListEqual(list(result.keys()), return_keys) + + def test_run_models_remote(self): + self.setup_run() + self.setup_run_models() + result = models.run_models('dataset_name', 'model_name', 'gcs_bucket', + 'train_split', 'validation_split') + self.remote.assert_called() + self.run.assert_not_called() + self.classifier_trainer.assert_called() + + self.assertIsNone(result) + + def test_run_experiment_cloud(self): + self.setup_run(remote=False) + file_id = 'test_id' + self.setup_run_experiment_cloud(file_id) + run_experiment_kwargs = dict() + models.run_experiment_cloud(run_experiment_kwargs=run_experiment_kwargs) + entry_point = models._ENTRY_POINT_FORMAT.format(file_id) + self.run.assert_called_with( + entry_point=entry_point, distribution_strategy=None) + self.remove.assert_any_call(entry_point) + self.remove.assert_any_call(self.params_file) + + def setup_copy_entry_point(self): + self.get_original_lines = mock.patch.object( + models, + 'get_original_lines', + autospec=True, + return_value=['PARAMS_FILE_NAME = not this', 'do not change'], + ).start() + + self.open = mock.mock_open() + mock.patch( + 'builtins.open', + self.open, + ).start() + + def test_copy_entry_point(self): + self.setup_copy_entry_point() + file_id = 'file_id' + params_file = 'params_file' + models.copy_entry_point(file_id, params_file) + + self.open.assert_called_once_with( + models._ENTRY_POINT_FORMAT.format(file_id), 'w') + entry_file = self.open() + entry_file.write.assert_any_call(f"PARAMS_FILE_NAME = '{params_file}'\n") + entry_file.write.assert_any_call('do not change') + + def test_get_distribution_strategy_tpu(self): + run_kwargs = dict( + worker_count=1, + worker_config=machine_config.COMMON_MACHINE_CONFIGS['TPU'], + ) + strategy = models.get_distribution_strategy_str(run_kwargs) + self.assertEqual('tpu', strategy) + + def test_get_distribution_strategy_multi_mirror(self): + run_kwargs = dict(worker_count=1) + strategy = models.get_distribution_strategy_str(run_kwargs) + self.assertEqual('multi_mirror', strategy) + + def test_get_distribution_strategy_mirror(self): + run_kwargs = dict( + chief_config=machine_config.COMMON_MACHINE_CONFIGS['K80_4X']) + strategy = models.get_distribution_strategy_str(run_kwargs) + self.assertEqual('mirror', strategy) + + def test_get_distribution_strategy_one_device(self): + run_kwargs = dict() + strategy = models.get_distribution_strategy_str(run_kwargs) + self.assertEqual('one_device', strategy) if __name__ == '__main__': absltest.main()