diff --git a/mdl/pom.xml b/mdl/pom.xml index 408adb52..5ef2e253 100644 --- a/mdl/pom.xml +++ b/mdl/pom.xml @@ -22,7 +22,7 @@ herd-mdl org.finra.herd-mdl - 1.4.0 + 1.5.0 diff --git a/metastor/managedObjectLoader/pom.xml b/metastor/managedObjectLoader/pom.xml index cf269001..8d2455f6 100644 --- a/metastor/managedObjectLoader/pom.xml +++ b/metastor/managedObjectLoader/pom.xml @@ -19,7 +19,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.38 + 1.2.43 4.0.0 diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java index d4fb028f..b3ada3db 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java @@ -32,6 +32,7 @@ import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.List; import java.util.Map; @@ -44,204 +45,231 @@ @Slf4j public class DataMgmtSvc { - @Value("${AGS}") - private String ags; + @Value( "${AGS}" ) + private String ags; - @Value("${CLUSTER_DEF_NAME}") - String clusterDef; + @Value( "${CLUSTER_DEF_NAME}" ) + String clusterDef; - @Value("${CLUSTER_DEF_NAME_STATS}") - String clusterDefNameStats; + @Value( "${CLUSTER_DEF_NAME_STATS}" ) + String clusterDefNameStats; - @Autowired - ApiClient dmApiClient; + @Autowired + ApiClient dmApiClient; - @Autowired - BusinessObjectDataApi businessObjectDataApi; + @Autowired + BusinessObjectDataApi businessObjectDataApi; - static { - javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier( - (hostname, sslSession) -> true); - } - - public String getTableSchema(org.finra.herd.metastore.managed.JobDefinition jd, boolean replaceColumn) throws ApiException { - - BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi(dmApiClient); - - BusinessObjectFormatDdlRequest request = new BusinessObjectFormatDdlRequest(); - request.setBusinessObjectDefinitionName(jd.getActualObjectName()); - request.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); - request.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); - - request.setNamespace(jd.getObjectDefinition().getNameSpace()); - request.setIncludeDropTableStatement(false); - request.setIncludeIfNotExistsOption(!replaceColumn); - request.setOutputFormat(BusinessObjectFormatDdlRequest.OutputFormatEnum.HIVE_13_DDL); - request.setReplaceColumns(replaceColumn); - - request.setTableName(jd.getTableName()); - - BusinessObjectFormatDdl ddl = businessObjectFormatApi.businessObjectFormatGenerateBusinessObjectFormatDdl(request); - - return ddl.getDdl(); - } - - public BusinessObjectFormat getDMFormat(org.finra.herd.metastore.managed.JobDefinition jd) throws ApiException { - - BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi(dmApiClient); - - BusinessObjectFormat format = businessObjectFormatApi.businessObjectFormatGetBusinessObjectFormat(jd.getObjectDefinition().getNameSpace(), - jd.getActualObjectName(), jd.getObjectDefinition().getUsageCode(), jd.getObjectDefinition().getFileType(), null); - - return format; - } - - public BusinessObjectDataDdl getBusinessObjectDataDdl(org.finra.herd.metastore.managed.JobDefinition jd, List partitions) throws ApiException { - BusinessObjectDataDdlRequest request = new BusinessObjectDataDdlRequest(); - - request.setIncludeDropTableStatement(false); - request.setOutputFormat(BusinessObjectDataDdlRequest.OutputFormatEnum.HIVE_13_DDL); - request.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); - request.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); - request.setBusinessObjectDefinitionName(jd.getObjectDefinition().getObjectName()); - request.setAllowMissingData(true); - request.setIncludeDropPartitions(true); - request.setIncludeIfNotExistsOption(true); - request.setTableName(jd.getTableName()); - - List partitionValueFilters = Lists.newArrayList(); - - log.info("PartitionKey: {} \t Partitions: {}", jd.getPartitionKey(), partitions); - if (MetastoreUtil.isPartitionedSingleton(jd.getWfType(), jd.getPartitionKey())) { - addPartitionedSingletonFilter(jd, partitionValueFilters); - } else { - - if (MetastoreUtil.isNonPartitionedSingleton(jd.getWfType(), jd.getPartitionKey())) { - addPartitionFilter(jd.getPartitionKey(), Lists.newArrayList("none"), partitionValueFilters); - } else { - if (jd.isSubPartitionLevelProcessing()) { - addSubPartitionFilter(jd, partitionValueFilters); - } else { - addPartitionFilter(jd.getPartitionKey(), partitions, partitionValueFilters); - } - } - } - - request.setPartitionValueFilter(null); - request.setPartitionValueFilters(partitionValueFilters); - request.setNamespace(jd.getObjectDefinition().getNameSpace()); - - log.info("Get BO DDL Request: \n{}", request.toString()); - return businessObjectDataApi.businessObjectDataGenerateBusinessObjectDataDdl(request); - } - - private void addPartitionFilter(String partitionKey, List partitions, List partitionValueFilters) { - PartitionValueFilter filter = new PartitionValueFilter(); - filter.setPartitionKey(partitionKey); - filter.setPartitionValues(partitions); - partitionValueFilters.add(filter); - } - - private void addPartitionedSingletonFilter(JobDefinition jd, List partitionValueFilters) { - Calendar c = Calendar.getInstance(); - c.add(Calendar.DATE, 1); - String date = new SimpleDateFormat("YYYY-MM-dd").format(c.getTime()); - LatestBeforePartitionValue value = new LatestBeforePartitionValue(); - value.setPartitionValue(date); - - PartitionValueFilter filter = new PartitionValueFilter(); - filter.setPartitionKey(jd.getPartitionKey()); - filter.setLatestBeforePartitionValue(value); - filter.setPartitionValues(null); - filter.setLatestAfterPartitionValue(null); - partitionValueFilters.add(filter); - } - - /** - * To partition filter with sub partitions - * - * @param jd - * @param partitionValueFilters - * @throws ApiException - */ - private void addSubPartitionFilter(JobDefinition jd, List partitionValueFilters) throws ApiException { - List partitionKeys = getDMFormat(jd).getSchema().getPartitions(); - log.info("Partition Keys {} for {}", partitionKeys, jd.getTableName()); - Map partitionKeyValues = Maps.newLinkedHashMap(); - - IntStream.range(0, jd.getPartitionValues().size()) - .forEach(i -> { - String partitionKey = partitionKeys.get(i).getName(); - String partitionValue = jd.getPartitionValues().get(i); - log.info("Partition Key: {}\t Value: {}", partitionKey, partitionValue); - partitionKeyValues.put(partitionKey, partitionValue); - - addPartitionFilter(partitionKey, Lists.newArrayList(partitionValue), partitionValueFilters); - }); - - jd.setPartitionsKeyValue(partitionKeyValues); - } - - public BusinessObjectFormatKeys getBOAllFormatVersions(org.finra.herd.metastore.managed.JobDefinition od, boolean latestBusinessObjectFormatVersion) throws ApiException { - - return new BusinessObjectFormatApi(dmApiClient) - .businessObjectFormatGetBusinessObjectFormats( - od.getObjectDefinition().getNameSpace() - , od.getObjectDefinition().getObjectName() - , latestBusinessObjectFormatVersion); - } - - public BusinessObjectDataNotificationRegistrationKeys getBORegisteredNotification(org.finra.herd.metastore.managed.JobDefinition od) throws ApiException { - return new BusinessObjectDataNotificationRegistrationApi(dmApiClient) - .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistrationsByNotificationFilter( - od.getObjectDefinition().getNameSpace() - , od.getObjectDefinition().getObjectName() - , od.getObjectDefinition().getUsageCode() - , od.getObjectDefinition().getFileType() - - ); - } - - public BusinessObjectDataNotificationRegistration getBORegisteredNotificationDetails(String notificationName) throws ApiException { - return new BusinessObjectDataNotificationRegistrationApi(dmApiClient) - .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistration(ags, notificationName); - } - - - public void createCluster( boolean startStatsCluster, String proposedName ) throws ApiException { - EmrApi emrApi = new EmrApi(dmApiClient); - EmrClusterCreateRequest request = new EmrClusterCreateRequest(); - request.setNamespace( ags ); - request.setDryRun(false); - - request.setEmrClusterDefinitionName(clusterDef); - if ( startStatsCluster ) { - request.setEmrClusterDefinitionName(clusterDefNameStats); - } - - request.setEmrClusterName( proposedName); - EmrCluster cluster = emrApi.eMRCreateEmrCluster(request); - log.info(cluster.toString()); - } - - public BusinessObjectDataSearchResult searchBOData(JobDefinition jd, int pageNum, int pageSize, Boolean filterOnValidLatestVersions) throws ApiException { - // Create Search Key - BusinessObjectDataSearchKey boDataSearchKeyItem = new BusinessObjectDataSearchKey(); - boDataSearchKeyItem.setNamespace(jd.getObjectDefinition().getNameSpace()); - boDataSearchKeyItem.setBusinessObjectDefinitionName(jd.getObjectDefinition().getObjectName()); - boDataSearchKeyItem.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); - boDataSearchKeyItem.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); - boDataSearchKeyItem.setFilterOnLatestValidVersion(filterOnValidLatestVersions); - - - // Search BO Data - return businessObjectDataApi.businessObjectDataSearchBusinessObjectData( - new BusinessObjectDataSearchRequest() - .addBusinessObjectDataSearchFiltersItem(new BusinessObjectDataSearchFilter() - .addBusinessObjectDataSearchKeysItem(boDataSearchKeyItem) - ) - , pageNum - , pageSize); - } + static { + javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier( + ( hostname, sslSession ) -> true ); + } + + public String getTableSchema( org.finra.herd.metastore.managed.JobDefinition jd, boolean replaceColumn ) throws ApiException { + + BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi( dmApiClient ); + + BusinessObjectFormatDdlRequest request = new BusinessObjectFormatDdlRequest(); + request.setBusinessObjectDefinitionName( jd.getActualObjectName() ); + request.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); + request.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); + + request.setNamespace( jd.getObjectDefinition().getNameSpace() ); + request.setIncludeDropTableStatement( false ); + request.setIncludeIfNotExistsOption( !replaceColumn ); + request.setOutputFormat( BusinessObjectFormatDdlRequest.OutputFormatEnum.HIVE_13_DDL ); + request.setReplaceColumns( replaceColumn ); + + request.setTableName( jd.getTableName() ); + + BusinessObjectFormatDdl ddl = businessObjectFormatApi.businessObjectFormatGenerateBusinessObjectFormatDdl( request ); + + return ddl.getDdl(); + } + + public BusinessObjectFormat getDMFormat( org.finra.herd.metastore.managed.JobDefinition jd ) throws ApiException { + + BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi( dmApiClient ); + + BusinessObjectFormat format = businessObjectFormatApi.businessObjectFormatGetBusinessObjectFormat( jd.getObjectDefinition().getNameSpace(), + jd.getActualObjectName(), jd.getObjectDefinition().getUsageCode(), jd.getObjectDefinition().getFileType(), null ); + + return format; + } + + public BusinessObjectDataDdl getBusinessObjectDataDdl( org.finra.herd.metastore.managed.JobDefinition jd, List partitions ) throws ApiException { + BusinessObjectDataDdlRequest request = new BusinessObjectDataDdlRequest(); + + request.setIncludeDropTableStatement( false ); + request.setOutputFormat( BusinessObjectDataDdlRequest.OutputFormatEnum.HIVE_13_DDL ); + request.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); + request.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); + request.setBusinessObjectDefinitionName( jd.getObjectDefinition().getObjectName() ); + request.setAllowMissingData( true ); + request.setIncludeDropPartitions( true ); + request.setIncludeIfNotExistsOption( true ); + request.setTableName( jd.getTableName() ); + + List partitionValueFilters = Lists.newArrayList(); + + log.info( "PartitionKey: {} \t Partitions: {}", jd.getPartitionKey(), partitions ); + if ( MetastoreUtil.isPartitionedSingleton( jd.getWfType(), jd.getPartitionKey() ) ) { + addPartitionedSingletonFilter( jd, partitionValueFilters ); + } else { + + if ( MetastoreUtil.isNonPartitionedSingleton( jd.getWfType(), jd.getPartitionKey() ) ) { + addPartitionFilter( jd.getPartitionKey(), Lists.newArrayList( "none" ), partitionValueFilters ); + } else { + if ( jd.isSubPartitionLevelProcessing() ) { + addSubPartitionFilter( jd, partitionValueFilters ); + } else { + addPartitionFilter( jd.getPartitionKey(), partitions, partitionValueFilters ); + } + } + } + + request.setPartitionValueFilter( null ); + request.setPartitionValueFilters( partitionValueFilters ); + request.setNamespace( jd.getObjectDefinition().getNameSpace() ); + + log.info( "Get BO DDL Request: \n{}", request.toString() ); + return businessObjectDataApi.businessObjectDataGenerateBusinessObjectDataDdl( request ); + } + + private void addPartitionFilter( String partitionKey, List partitions, List partitionValueFilters ) { + PartitionValueFilter filter = new PartitionValueFilter(); + filter.setPartitionKey( partitionKey ); + filter.setPartitionValues( partitions ); + partitionValueFilters.add( filter ); + } + + private void addPartitionedSingletonFilter( JobDefinition jd, List partitionValueFilters ) { + Calendar c = Calendar.getInstance(); + c.add( Calendar.DATE, 1 ); + String date = new SimpleDateFormat( "YYYY-MM-dd" ).format( c.getTime() ); + LatestBeforePartitionValue value = new LatestBeforePartitionValue(); + value.setPartitionValue( date ); + + PartitionValueFilter filter = new PartitionValueFilter(); + filter.setPartitionKey( jd.getPartitionKey() ); + filter.setLatestBeforePartitionValue( value ); + filter.setPartitionValues( null ); + filter.setLatestAfterPartitionValue( null ); + partitionValueFilters.add( filter ); + } + + /** + * To partition filter with sub partitions + * + * @param jd + * @param partitionValueFilters + * @throws ApiException + */ + private void addSubPartitionFilter( JobDefinition jd, List partitionValueFilters ) throws ApiException { + List partitionKeys = getDMFormat( jd ).getSchema().getPartitions(); + log.info( "Partition Keys {} for {}", partitionKeys, jd.getTableName() ); + Map partitionKeyValues = Maps.newLinkedHashMap(); + + IntStream.range( 0, jd.getPartitionValues().size() ) + .forEach( i -> { + String partitionKey = partitionKeys.get( i ).getName(); + String partitionValue = jd.getPartitionValues().get( i ); + log.info( "Partition Key: {}\t Value: {}", partitionKey, partitionValue ); + partitionKeyValues.put( partitionKey, partitionValue ); + + addPartitionFilter( partitionKey, Lists.newArrayList( partitionValue ), partitionValueFilters ); + } ); + + jd.setPartitionsKeyValue( partitionKeyValues ); + } + + public BusinessObjectFormatKeys getBOAllFormatVersions( org.finra.herd.metastore.managed.JobDefinition od, boolean latestBusinessObjectFormatVersion ) throws ApiException { + + return new BusinessObjectFormatApi( dmApiClient ) + .businessObjectFormatGetBusinessObjectFormats( + od.getObjectDefinition().getNameSpace() + , od.getObjectDefinition().getObjectName() + , latestBusinessObjectFormatVersion ); + } + + public BusinessObjectDataNotificationRegistrationKeys getBORegisteredNotification( org.finra.herd.metastore.managed.JobDefinition od ) throws ApiException { + return new BusinessObjectDataNotificationRegistrationApi( dmApiClient ) + .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistrationsByNotificationFilter( + od.getObjectDefinition().getNameSpace() + , od.getObjectDefinition().getObjectName() + , od.getObjectDefinition().getUsageCode() + , od.getObjectDefinition().getFileType() + + ); + } + + public BusinessObjectDataNotificationRegistration getBORegisteredNotificationDetails( String notificationName ) throws ApiException { + return new BusinessObjectDataNotificationRegistrationApi( dmApiClient ) + .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistration( ags, notificationName ); + } + + + public void createCluster( boolean startStatsCluster, String proposedName ) throws ApiException { + EmrApi emrApi = new EmrApi( dmApiClient ); + EmrClusterCreateRequest request = new EmrClusterCreateRequest(); + request.setNamespace( ags ); + request.setDryRun( false ); + + request.setEmrClusterDefinitionName( clusterDef ); + if ( startStatsCluster ) { + request.setEmrClusterDefinitionName( clusterDefNameStats ); + } + + request.setEmrClusterName( proposedName ); + EmrCluster cluster = emrApi.eMRCreateEmrCluster( request ); + log.info( cluster.toString() ); + } + + public BusinessObjectDataSearchResult searchBOData( JobDefinition jd, int pageNum, int pageSize, Boolean filterOnValidLatestVersions ) throws ApiException { + // Create Search Key + BusinessObjectDataSearchKey boDataSearchKeyItem = new BusinessObjectDataSearchKey(); + boDataSearchKeyItem.setNamespace( jd.getObjectDefinition().getNameSpace() ); + boDataSearchKeyItem.setBusinessObjectDefinitionName( jd.getObjectDefinition().getObjectName() ); + boDataSearchKeyItem.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); + boDataSearchKeyItem.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); + boDataSearchKeyItem.setFilterOnLatestValidVersion( filterOnValidLatestVersions ); + + + // Search BO Data + return businessObjectDataApi.businessObjectDataSearchBusinessObjectData( + new BusinessObjectDataSearchRequest() + .addBusinessObjectDataSearchFiltersItem( new BusinessObjectDataSearchFilter() + .addBusinessObjectDataSearchKeysItem( boDataSearchKeyItem ) + ) + , pageNum + , pageSize ); + } + + public void filterPartitionsAsPerAvailability( JobDefinition jd, List partitions ) throws ApiException { + log.info( "Checking Partitions Availability: {}", partitions ); + + BusinessObjectDataAvailabilityRequest request = new BusinessObjectDataAvailabilityRequest(); + + request.setNamespace( jd.getObjectDefinition().getNameSpace() ); + request.setBusinessObjectDefinitionName( jd.getObjectDefinition().getObjectName() ); + request.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); + request.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); + + PartitionValueFilter partitionValueFilter = new PartitionValueFilter(); + partitionValueFilter.setPartitionKey( jd.getPartitionKey() ); + partitionValueFilter.setPartitionValues( partitions ); + + request.setPartitionValueFilter( partitionValueFilter ); + + businessObjectDataApi.businessObjectDataCheckBusinessObjectDataAvailability( request ) + .getNotAvailableStatuses() + .stream() + .forEach( as -> { + log.info( "Removing => " + as.getPartitionValue() ); + partitions.remove( as.getPartitionValue() ); + } ); + + log.info( "Filtered Partitions: {}", partitions ); + } } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java index 352a87d3..72c8ec1e 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java @@ -133,15 +133,9 @@ private void identifyPartitionsAndBackLoad( JobDefinition od, JobSubmitterInfo j if ( !ts.isEmpty() ) { if ( jsi.isPartitionDateType() ) { - if ( Strings.isNullOrEmpty( startDate.get() ) ) { - startDate.set( ts.first() ); - } - // To include skipped partition dates - using end date of the previous chunk - String endDate = ts.last(); - jsi.setPartitionValues( String.format( "%s%s%s", endDate, JobProcessorConstants.DOUBLE_UNDERSCORE, startDate ) ); + jsi.setPartitionValues( String.format( "%s%s%s", ts.last(), JobProcessorConstants.DOUBLE_UNDERSCORE, ts.first() ) ); addPartitions( jsi ); - startDate.set( endDate ); } else { jsi.setPartitionValues( delimitedPartitionValues( ts.descendingSet() ) ); addPartitions( jsi ); diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java index 27868b8c..3c83b74c 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java @@ -281,6 +281,9 @@ protected void addAnalyzeStats( JobDefinition jd, List partitions ) { if ( partitions.size() == 1 ) { submitStatsJob( jd, jd.partitionValuesForStats(partitions.get( 0 )) ); } else { + //Filter not available Partitions + dataMgmtSvc.filterPartitionsAsPerAvailability( jd, partitions ); + partitions.stream() .forEach( s -> submitStatsJob( jd, s ) ); } diff --git a/metastor/metastoreOperations/pom.xml b/metastor/metastoreOperations/pom.xml index 1977568d..8a3f0b26 100644 --- a/metastor/metastoreOperations/pom.xml +++ b/metastor/metastoreOperations/pom.xml @@ -20,7 +20,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.38 + 1.2.43 4.0.0 diff --git a/metastor/pom.xml b/metastor/pom.xml index fe968111..ca7ae71f 100644 --- a/metastor/pom.xml +++ b/metastor/pom.xml @@ -29,7 +29,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.38 + 1.2.43 pom diff --git a/pom.xml b/pom.xml index 422ca3a6..0d15982a 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.finra.herd-mdl herd-mdl pom - 1.4.0 + 1.5.0 This is the base pom for the herd-mdl project.