From 05035482a9cc0dbf75ec990833bf4e95f716df4b Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 4 Feb 2020 16:32:17 -0500 Subject: [PATCH 1/8] Add CopyMysqlDatabaseFromS3ToS3Task. --- .../load_internal_reporting_database.py | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index 8caae7d0ec..f1c5dc1f9c 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -17,6 +17,8 @@ from edx.analytics.tasks.common.vertica_load import SchemaManagementTask, VerticaCopyTask from edx.analytics.tasks.util.hive import HivePartition, WarehouseMixin from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url, url_path_join +from edx.analytics.tasks.util.s3_util import ScalableS3Client + try: from google.cloud.bigquery import SchemaField @@ -1060,3 +1062,104 @@ def complete(self): """ # OverwriteOutputMixin changes the complete() method behavior, so we override it. return all(r.complete() for r in luigi.task.flatten(self.requires())) + + +class CopyMysqlDatabaseFromS3ToS3Task(WarehouseMixin, luigi.WrapperTask): + """ + Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another. + """ + date = luigi.DateParameter( + default=datetime.datetime.utcnow().date(), + ) + database = luigi.Parameter( + description='Name of database as stored in S3.' + ) + warehouse_subdirectory = luigi.Parameter( + default='import_mysql_to_vertica', + description='Subdirectory under warehouse_path to store intermediate data.' + ) + new_warehouse_path = luigi.Parameter( + description='The warehouse_path URL to which to copy database data.' + ) + + def __init__(self, *args, **kwargs): + """ + Inits this Luigi task. + """ + super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) + self.metadata = None + self.s3_client = ScalableS3Client() + + @property + def database_metadata(self): + if self.metadata is None: + metadata_target = self.get_schema_metadata_target() + with metadata_target.open('r') as metadata_file: + self.metadata = json.load(metadata_file) + return self.metadata + + def get_schema_metadata_target(self): + partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec + metadata_location = url_path_join( + self.warehouse_path, + self.warehouse_subdirectory, + self.database, + DUMP_METADATA_OUTPUT, + partition_path_spec, + METADATA_FILENAME, + ) + return get_target_from_url(metadata_location) + + def get_table_list_for_database(self): + return self.database_metadata['table_list'] + + def output(self): + partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec + metadata_location = url_path_join( + self.new_warehouse_path, + self.warehouse_subdirectory, + self.database, + DUMP_METADATA_OUTPUT, + partition_path_spec, + METADATA_FILENAME, + ) + return get_target_from_url(metadata_location) + + def copy_table(self, table_name): + partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec + source_path = url_path_join( + self.warehouse_path, + self.warehouse_subdirectory, + self.database, + table_name, + partition_path_spec, + ) + destination_path = url_path_join( + self.new_warehouse_path, + self.warehouse_subdirectory, + self.database, + table_name, + partition_path_spec, + ) + kwargs = {} + # From boto doc: "If True, the ACL from the source key will be + # copied to the destination key. If False, the destination key + # will have the default ACL. Note that preserving the ACL in + # the new key object will require two additional API calls to + # S3, one to retrieve the current ACL and one to set that ACL + # on the new object. If you don't care about the ACL, a value + # of False will be significantly more efficient." + # kwargs['preserve_acl'] = True; + self.s3_client.copy(source_path, destination_path, **kwargs) + + def copy_metadata_file(self): + self.copy_table(DUMP_METADATA_OUTPUT) + + def run(self): + if self.new_warehouse_path == self.warehouse_path: + raise Exception("Must set new_warehouse_path {} to be different than warehouse_path {}".format(new_warehouse_path, self.warehouse_path)) + + for table_name in self.get_table_list_for_database(): + self.copy_table(table_name) + + self.copy_metadata_file() From f0e2bb31625eaa5c68ad73aad6c8a317be2b3ec2 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 4 Feb 2020 17:16:26 -0500 Subject: [PATCH 2/8] Use task instead of wrapper task. --- .../tasks/warehouse/load_internal_reporting_database.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index f1c5dc1f9c..4646413594 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -1064,7 +1064,7 @@ def complete(self): return all(r.complete() for r in luigi.task.flatten(self.requires())) -class CopyMysqlDatabaseFromS3ToS3Task(WarehouseMixin, luigi.WrapperTask): +class CopyMysqlDatabaseFromS3ToS3Task(WarehouseMixin, luigi.Task): """ Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another. """ @@ -1113,6 +1113,9 @@ def get_schema_metadata_target(self): def get_table_list_for_database(self): return self.database_metadata['table_list'] + def requires(self): + return ExternalURL(self.get_schema_metadata_target().path) + def output(self): partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec metadata_location = url_path_join( From 620b40d8e9aac07e0a7b430fc231b32888012fd2 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 4 Feb 2020 18:01:52 -0500 Subject: [PATCH 3/8] try setting signatureVersion. --- .../tasks/warehouse/load_internal_reporting_database.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index 4646413594..6fcedb0d24 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -1088,7 +1088,9 @@ def __init__(self, *args, **kwargs): """ super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) self.metadata = None - self.s3_client = ScalableS3Client() + self.s3_client = ScalableS3Client( + signatureVersion='v4', + ) @property def database_metadata(self): @@ -1153,6 +1155,7 @@ def copy_table(self, table_name): # on the new object. If you don't care about the ACL, a value # of False will be significantly more efficient." # kwargs['preserve_acl'] = True; + self.s3_client.copy(source_path, destination_path, **kwargs) def copy_metadata_file(self): From 6ed4cc37d1906fe2ec60e1778e7694c26dab2bc0 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 4 Feb 2020 18:17:30 -0500 Subject: [PATCH 4/8] set part_size to ~3GB (from 64MB). --- .../tasks/warehouse/load_internal_reporting_database.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index 6fcedb0d24..464ee7b921 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -1088,9 +1088,7 @@ def __init__(self, *args, **kwargs): """ super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) self.metadata = None - self.s3_client = ScalableS3Client( - signatureVersion='v4', - ) + self.s3_client = ScalableS3Client() @property def database_metadata(self): @@ -1156,7 +1154,7 @@ def copy_table(self, table_name): # of False will be significantly more efficient." # kwargs['preserve_acl'] = True; - self.s3_client.copy(source_path, destination_path, **kwargs) + self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs) def copy_metadata_file(self): self.copy_table(DUMP_METADATA_OUTPUT) From e9102f2dbe75cf0e6580b2ad91abda2749e6d490 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Wed, 5 Feb 2020 14:06:16 -0500 Subject: [PATCH 5/8] Try running aws s3 in a subprocess. --- .../warehouse/load_internal_reporting_database.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index 464ee7b921..26659d5ffa 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -5,6 +5,7 @@ import json import logging import re +import subprocess import luigi @@ -1088,7 +1089,7 @@ def __init__(self, *args, **kwargs): """ super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) self.metadata = None - self.s3_client = ScalableS3Client() + # self.s3_client = ScalableS3Client() @property def database_metadata(self): @@ -1154,7 +1155,16 @@ def copy_table(self, table_name): # of False will be significantly more efficient." # kwargs['preserve_acl'] = True; - self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs) + # self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs) + command = 'aws s3 cp {source_path} {destination_path} --recursive'.format( + source_path=source_path, destination_path=destination_path + ) + try: + log.info("Calling '{}'".format(command)) + return_val = subprocess.check_call(command, shell=True) + log.info("Call returned '{}'".format(return_val)) + except subprocess.CalledProcessError as exception: + raise def copy_metadata_file(self): self.copy_table(DUMP_METADATA_OUTPUT) From 56751adab6ffeeff58f332f5c9968e86de7370d3 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Wed, 5 Feb 2020 14:44:34 -0500 Subject: [PATCH 6/8] Add awscli==1.14.46 --- requirements/default.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/requirements/default.txt b/requirements/default.txt index bbb0bcc7e2..17fe150b26 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -99,3 +99,7 @@ yarn-api-client==0.2.3 # The following packages are considered to be unsafe in a requirements file: # setuptools==41.4.0 # via google-cloud-core, protobuf, python-daemon + +# Add this here for the copy S3 task as a standalone hack. +# But pick a version that matches the boto requirements above... +awscli==1.14.46 From 1cd167e8ec5d1dd9e16e26426dbf1f9e49b37703 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Wed, 5 Feb 2020 17:15:53 -0500 Subject: [PATCH 7/8] Add s3_to_s3 copy for Vertica schema output too. --- edx/analytics/tasks/common/vertica_export.py | 95 +++++++++++++++++++ .../load_internal_reporting_database.py | 26 +++-- 2 files changed, 106 insertions(+), 15 deletions(-) diff --git a/edx/analytics/tasks/common/vertica_export.py b/edx/analytics/tasks/common/vertica_export.py index be439858ab..4435771557 100644 --- a/edx/analytics/tasks/common/vertica_export.py +++ b/edx/analytics/tasks/common/vertica_export.py @@ -5,6 +5,7 @@ import json import logging import re +import subprocess import luigi @@ -368,3 +369,97 @@ def run(self): with self.output().open('w') as metadata_file: json.dump(metadata, metadata_file) + + +class CopyVerticaDatabaseFromS3ToS3Task(VerticaExportMixin, luigi.Task): + """ + Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another. + """ + new_warehouse_path = luigi.Parameter( + description='The warehouse_path URL to which to copy database data.' + ) + # remove parameters that are not needed (or shouldn't be used). + vertica_credentials = None + intermediate_warehouse_path = None + + def __init__(self, *args, **kwargs): + """ + Inits this Luigi task. + """ + super(CopyVerticaDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) + self.metadata = None + + @property + def schema_metadata(self): + if self.metadata is None: + metadata_target = self.get_schema_metadata_target() + with metadata_target.open('r') as metadata_file: + self.metadata = json.load(metadata_file) + return self.metadata + + def get_table_list_for_database(self): + return self.schema_metadata['table_list'] + + def requires(self): + return ExternalURL(self.get_schema_metadata_target().path) + + def output(self): + partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec + metadata_location = url_path_join( + self.new_warehouse_path, + 'import/vertica/sqoop/', + self.vertica_warehouse_name, + self.vertica_schema_name, + '_metadata_export_schema', + partition_path_spec, + '_metadata' + ) + return get_target_from_url(metadata_location) + + def copy_table(self, table_name): + partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec + source_path = url_path_join( + self.warehouse_path, + 'import/vertica/sqoop/', + self.vertica_warehouse_name, + self.vertica_schema_name, + table_name, + partition_path_spec, + ) + destination_path = url_path_join( + self.new_warehouse_path, + 'import/vertica/sqoop/', + self.vertica_warehouse_name, + self.vertica_schema_name, + table_name, + partition_path_spec, + ) + kwargs = {} + # First attempt was to create a ScalableS3Client() wrapper in __init__, then calling: + # self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs) + # This succeeded when files were small enough to be below the part_size, but there were + # files for LMS and ecommerce that exceeded this limit (i.e. by a lot), and multi-part + # uploads were failing due to "SignatureDoesNotMatch" errors from S3. Since we're using + # older boto code instead of boto3 (which requires a Luigi upgrade), it seemed easier to + # just install awscli and use that in a subprocess to copy each table's data. + command = 'aws s3 cp {source_path} {destination_path} --recursive'.format( + source_path=source_path, destination_path=destination_path + ) + try: + log.info("Calling '{}'".format(command)) + return_val = subprocess.check_call(command, shell=True) + log.info("Call returned '{}'".format(return_val)) + except subprocess.CalledProcessError as exception: + raise + + def copy_metadata_file(self): + self.copy_table('_metadata_export_schema') + + def run(self): + if self.new_warehouse_path == self.warehouse_path: + raise Exception("Must set new_warehouse_path {} to be different than warehouse_path {}".format(new_warehouse_path, self.warehouse_path)) + + for table_name in self.get_table_list_for_database(): + self.copy_table(table_name) + + self.copy_metadata_file() diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py index 26659d5ffa..9fd5d59218 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_database.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_database.py @@ -1010,7 +1010,7 @@ def __init__(self, *args, **kwargs): Inits this Luigi task. """ super(ImportMysqlDatabaseFromS3ToSnowflakeSchemaTask, self).__init__(*args, **kwargs) - metadata_target = self.get_schema_metadata_target() + metadata_target = self.get_database_metadata_target() with metadata_target.open('r') as metadata_file: self.metadata = json.load(metadata_file) @@ -1019,7 +1019,7 @@ def __init__(self, *args, **kwargs): def get_table_list_for_database(self): return self.metadata['table_list'] - def get_schema_metadata_target(self): + def get_database_metadata_target(self): partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec metadata_location = url_path_join( self.warehouse_path, @@ -1089,17 +1089,16 @@ def __init__(self, *args, **kwargs): """ super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) self.metadata = None - # self.s3_client = ScalableS3Client() @property def database_metadata(self): if self.metadata is None: - metadata_target = self.get_schema_metadata_target() + metadata_target = self.get_database_metadata_target() with metadata_target.open('r') as metadata_file: self.metadata = json.load(metadata_file) return self.metadata - def get_schema_metadata_target(self): + def get_database_metadata_target(self): partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec metadata_location = url_path_join( self.warehouse_path, @@ -1115,7 +1114,7 @@ def get_table_list_for_database(self): return self.database_metadata['table_list'] def requires(self): - return ExternalURL(self.get_schema_metadata_target().path) + return ExternalURL(self.get_database_metadata_target().path) def output(self): partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec @@ -1146,16 +1145,13 @@ def copy_table(self, table_name): partition_path_spec, ) kwargs = {} - # From boto doc: "If True, the ACL from the source key will be - # copied to the destination key. If False, the destination key - # will have the default ACL. Note that preserving the ACL in - # the new key object will require two additional API calls to - # S3, one to retrieve the current ACL and one to set that ACL - # on the new object. If you don't care about the ACL, a value - # of False will be significantly more efficient." - # kwargs['preserve_acl'] = True; - + # First attempt was to create a ScalableS3Client() wrapper in __init__, then calling: # self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs) + # This succeeded when files were small enough to be below the part_size, but there were + # files for LMS and ecommerce that exceeded this limit (i.e. by a lot), and multi-part + # uploads were failing due to "SignatureDoesNotMatch" errors from S3. Since we're using + # older boto code instead of boto3 (which requires a Luigi upgrade), it seemed easier to + # just install awscli and use that in a subprocess to copy each table's data. command = 'aws s3 cp {source_path} {destination_path} --recursive'.format( source_path=source_path, destination_path=destination_path ) From 419a7b885337c3c2536c23bf12e4220572d096f8 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Thu, 6 Feb 2020 15:57:57 -0500 Subject: [PATCH 8/8] Change Vertica names from database to schema. --- edx/analytics/tasks/common/vertica_export.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/edx/analytics/tasks/common/vertica_export.py b/edx/analytics/tasks/common/vertica_export.py index 4435771557..fc2f318c1d 100644 --- a/edx/analytics/tasks/common/vertica_export.py +++ b/edx/analytics/tasks/common/vertica_export.py @@ -371,7 +371,7 @@ def run(self): json.dump(metadata, metadata_file) -class CopyVerticaDatabaseFromS3ToS3Task(VerticaExportMixin, luigi.Task): +class CopyVerticaSchemaFromS3ToS3Task(VerticaExportMixin, luigi.Task): """ Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another. """ @@ -386,7 +386,7 @@ def __init__(self, *args, **kwargs): """ Inits this Luigi task. """ - super(CopyVerticaDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs) + super(CopyVerticaSchemaFromS3ToS3Task, self).__init__(*args, **kwargs) self.metadata = None @property @@ -397,7 +397,7 @@ def schema_metadata(self): self.metadata = json.load(metadata_file) return self.metadata - def get_table_list_for_database(self): + def get_table_list_for_schema(self): return self.schema_metadata['table_list'] def requires(self): @@ -459,7 +459,7 @@ def run(self): if self.new_warehouse_path == self.warehouse_path: raise Exception("Must set new_warehouse_path {} to be different than warehouse_path {}".format(new_warehouse_path, self.warehouse_path)) - for table_name in self.get_table_list_for_database(): + for table_name in self.get_table_list_for_schema(): self.copy_table(table_name) self.copy_metadata_file()