Skip to content

Aws sdk v2 nio #6126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions modules/nf-commons/src/main/nextflow/file/FileHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,9 @@ class FileHelper {

@Override
FileVisitResult visitFile(Path fullPath, BasicFileAttributes attrs) throws IOException {
final path = folder.relativize(fullPath)
def path = fullPath
if( fullPath.isAbsolute() )
path = folder.relativize(fullPath)
log.trace "visitFiles > file=$path; includeFile=$includeFile; matches=${matcher.matches(path)}; isRegularFile=${attrs.isRegularFile()}"

if (includeFile && matcher.matches(path) && (attrs.isRegularFile() || (options.followLinks == false && attrs.isSymbolicLink())) && (includeHidden || !isHidden(fullPath))) {
Expand Down Expand Up @@ -912,7 +914,9 @@ class FileHelper {
}

static protected Path relativize0(Path folder, Path fullPath) {
def result = folder.relativize(fullPath)
def result = fullPath
if( fullPath.isAbsolute() )
result = folder.relativize(fullPath)
String str
if( folder.is(FileSystems.default) || !(str=result.toString()).endsWith('/') )
return result
Expand Down
23 changes: 12 additions & 11 deletions plugins/nf-amazon/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ dependencies {
compileOnly 'org.pf4j:pf4j:3.12.0'

api ('javax.xml.bind:jaxb-api:2.4.0-b180830.0359')
api ('com.amazonaws:aws-java-sdk-s3:1.12.777')
api ('com.amazonaws:aws-java-sdk-ec2:1.12.777')
api ('com.amazonaws:aws-java-sdk-batch:1.12.777')
api ('com.amazonaws:aws-java-sdk-iam:1.12.777')
api ('com.amazonaws:aws-java-sdk-ecs:1.12.777')
api ('com.amazonaws:aws-java-sdk-logs:1.12.777')
api ('com.amazonaws:aws-java-sdk-codecommit:1.12.777')
api ('com.amazonaws:aws-java-sdk-sts:1.12.777')
api ('com.amazonaws:aws-java-sdk-ses:1.12.777')
api ('software.amazon.awssdk:sso:2.26.26')
api ('software.amazon.awssdk:ssooidc:2.26.26')
api ('software.amazon.awssdk:s3:2.29.24')
api ('software.amazon.awssdk:ec2:2.29.24')
api ('software.amazon.awssdk:batch:2.29.24')
api ('software.amazon.awssdk:iam:2.29.24')
api ('software.amazon.awssdk:ecs:2.29.24')
api ('software.amazon.awssdk:cloudwatchlogs:2.29.24')
api ('software.amazon.awssdk:codecommit:2.29.24')
api ('software.amazon.awssdk:sts:2.29.24')
api ('software.amazon.awssdk:ses:2.29.24')
api ('software.amazon.awssdk:sso:2.29.24')
api ('software.amazon.awssdk:ssooidc:2.29.24')
api ("software.amazon.nio.s3:aws-java-nio-spi-for-s3:2.2.1")

constraints {
api 'com.fasterxml.jackson.core:jackson-databind:2.12.7.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package nextflow.cloud.aws

import nextflow.cloud.aws.nio.S3FileSystemProvider
import groovy.transform.CompileStatic
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
import software.amazon.nio.spi.s3.NextflowS3FileSystemProvider

/**
* Nextflow plugin for Amazon extensions
*
Expand All @@ -35,9 +36,7 @@ class AmazonPlugin extends BasePlugin {
@Override
void start() {
super.start()
// disable aws sdk v1 warning
System.setProperty("aws.java.v1.disableDeprecationAnnouncement", "true")
FileHelper.getOrInstallProvider(S3FileSystemProvider)
FileHelper.getOrInstallProvider(NextflowS3FileSystemProvider)
}

}
216 changes: 65 additions & 151 deletions plugins/nf-amazon/src/main/nextflow/cloud/aws/AwsClientFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,34 @@

package nextflow.cloud.aws

import com.amazonaws.AmazonClientException
import com.amazonaws.ClientConfiguration
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSCredentialsProviderChain
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.AnonymousAWSCredentials
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider
import com.amazonaws.auth.SystemPropertiesCredentialsProvider
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.auth.profile.ProfilesConfigFile
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.profile.path.AwsProfileFileLocationProvider
import com.amazonaws.regions.InstanceMetadataRegionProvider
import com.amazonaws.regions.Region
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.batch.AWSBatch
import com.amazonaws.services.batch.AWSBatchClient
import com.amazonaws.services.batch.AWSBatchClientBuilder
import com.amazonaws.services.ec2.AmazonEC2
import com.amazonaws.services.ec2.AmazonEC2Client
import com.amazonaws.services.ec2.AmazonEC2ClientBuilder
import com.amazonaws.services.ecs.AmazonECS
import com.amazonaws.services.ecs.AmazonECSClientBuilder
import com.amazonaws.services.logs.AWSLogs
import com.amazonaws.services.logs.AWSLogsAsyncClientBuilder
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
import software.amazon.awssdk.core.exception.SdkClientException
import software.amazon.awssdk.http.SdkHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider
import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient
import software.amazon.awssdk.services.ec2.Ec2Client
import software.amazon.awssdk.services.ecs.EcsClient
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.S3Configuration
import software.amazon.awssdk.services.sts.StsClient
import software.amazon.awssdk.services.sts.model.GetCallerIdentityRequest
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.cloud.aws.config.AwsConfig
import nextflow.cloud.aws.util.ConfigParser
import nextflow.cloud.aws.util.S3CredentialsProvider
import nextflow.cloud.aws.util.SsoCredentialsProviderV1

import nextflow.exception.AbortOperationException
import software.amazon.awssdk.services.sts.model.StsException

/**
* Implement a factory class for AWS client objects
*
Expand Down Expand Up @@ -136,11 +123,10 @@ class AwsClientFactory {
* it's not a EC2 instance
*/
protected String fetchIamRole() {
try {
def stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
return stsClient.getCallerIdentity(new GetCallerIdentityRequest()).getArn()
}
catch( AmazonClientException e ) {
try{
StsClient stsClient = StsClient.create()
return stsClient.getCallerIdentity(GetCallerIdentityRequest.builder().build() as GetCallerIdentityRequest).arn();
} catch ( StsException e) {
log.trace "Unable to fetch IAM credentials -- Cause: ${e.message}"
return null
}
Expand All @@ -156,11 +142,10 @@ class AwsClientFactory {
*/
private String fetchRegion() {
try {
return new InstanceMetadataRegionProvider().getRegion()
}
catch (AmazonClientException e) {
log.debug("Cannot fetch AWS region", e as Throwable)
return null
return new InstanceProfileRegionProvider().getRegion().id();
} catch ( SdkClientException e) {
log.debug("Cannot fetch AWS region", e);
return null;
}
}

Expand All @@ -171,148 +156,77 @@ class AwsClientFactory {
* @return A {@link Region} corresponding to the specified region string
*/
private Region getRegionObj(String region) {
final result = RegionUtils.getRegion(region)
final result = Region.of(region)
if( !result )
throw new IllegalArgumentException("Not a valid AWS region name: $region");
return result
}

/**
* Gets or lazily creates an {@link AmazonEC2Client} instance given the current
* Gets or lazily creates an {@link Ec2Client} instance given the current
* configuration parameter
*
* @return
* An {@link AmazonEC2Client} instance
* An {@link Ec2Client} instance
*/
synchronized AmazonEC2 getEc2Client() {

final builder = AmazonEC2ClientBuilder
.standard()
.withRegion(region)

final credentials = getCredentialsProvider0()
if( credentials )
builder.withCredentials(credentials)

return builder.build()
synchronized Ec2Client getEc2Client() {
return Ec2Client.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build()
}

/**
* Gets or lazily creates an {@link AWSBatchClient} instance given the current
* Gets or lazily creates an {@link BatchClient} instance given the current
* configuration parameter
*
* @return
* An {@link AWSBatchClient} instance
* An {@link BatchClient} instance
*/
@Memoized
AWSBatch getBatchClient() {
final builder = AWSBatchClientBuilder
.standard()
.withRegion(region)

final credentials = getCredentialsProvider0()
if( credentials )
builder.withCredentials(credentials)

return builder.build()
BatchClient getBatchClient() {
return BatchClient.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build()
}

@Memoized
AmazonECS getEcsClient() {

final builder = AmazonECSClientBuilder
.standard()
.withRegion(region)

final credentials = getCredentialsProvider0()
if( credentials )
builder.withCredentials(credentials)

return builder.build()
EcsClient getEcsClient() {
return EcsClient.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build()
}

@Memoized
AWSLogs getLogsClient() {

final builder = AWSLogsAsyncClientBuilder
.standard()
.withRegion(region)

final credentials = getCredentialsProvider0()
if( credentials )
builder.withCredentials(credentials)

return builder.build()
CloudWatchLogsClient getLogsClient() {
return CloudWatchLogsClient.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build()
}

AmazonS3 getS3Client(ClientConfiguration clientConfig=null, boolean global=false) {
final builder = AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(config.s3Config.pathStyleAccess)
.withForceGlobalBucketAccessEnabled(global)

final endpoint = config.s3Config.endpoint
if( endpoint )
builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, region))
else
builder.withRegion(region)

final credentials = config.s3Config.anonymous
? new AWSStaticCredentialsProvider(new AnonymousAWSCredentials())
: new S3CredentialsProvider(getCredentialsProvider0())
builder.withCredentials(credentials)
S3Client getS3Client(SdkHttpClient httpClient = null, boolean global = false) {
def builder = S3Client.builder()
.credentialsProvider(config.s3Config.anonymous ? AnonymousCredentialsProvider.create() : new S3CredentialsProvider(getCredentialsProvider0()))
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(config.s3Config.pathStyleAccess)
.build())

if (config.s3Config.endpoint) {
builder.endpointOverride(URI.create(config.s3Config.endpoint))
} else {
builder.region(getRegionObj(region))
}

if( clientConfig )
builder.withClientConfiguration(clientConfig)
if (httpClient != null) {
builder.httpClient(httpClient)
}

return builder.build()
}

protected AWSCredentialsProvider getCredentialsProvider0() {
if( accessKey && secretKey ) {
final creds = new BasicAWSCredentials(accessKey, secretKey)
return new AWSStaticCredentialsProvider(creds)
protected AwsCredentialsProvider getCredentialsProvider0() {
if (accessKey && secretKey) {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
}

if( profile ) {
return new AWSCredentialsProviderChain(List.of(
new ProfileCredentialsProvider(configFile(), profile),
new SsoCredentialsProviderV1(profile)))
if (profile) {
return ProfileCredentialsProvider.builder()
.profileName(profile)
.build()
}

return new AWSCredentialsProviderChain(List.of(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
WebIdentityTokenCredentialsProvider.create(),
new ProfileCredentialsProvider(configFile(), null),
new SsoCredentialsProviderV1(),
new EC2ContainerCredentialsProviderWrapper()))
return DefaultCredentialsProvider.create()
}

static ProfilesConfigFile configFile() {
final creds = AwsProfileFileLocationProvider.DEFAULT_CREDENTIALS_LOCATION_PROVIDER.getLocation()
final config = AwsProfileFileLocationProvider.DEFAULT_CONFIG_LOCATION_PROVIDER.getLocation()
if( creds && config && SysEnv.get('NXF_DISABLE_AWS_CONFIG_MERGE')!='true' ) {
log.debug "Merging AWS credentials file '$creds' and config file '$config'"
final parser = new ConfigParser()
// add the credentials first because it has higher priority
parser.parseConfig(creds.text)
// add also the content of config file
parser.parseConfig(config.text)
final temp = File.createTempFile('aws','config')
// merge into a temporary file
temp.deleteOnExit()
temp.text = parser.text()
return new ProfilesConfigFile(temp.absolutePath)
}
if( creds ) {
log.debug "Using AWS credentials file '$creds'"
return new ProfilesConfigFile(creds)
}
if( config ) {
log.debug "Using AWS config file '$config'"
return new ProfilesConfigFile(config)
}
return null
}
}
Loading
Loading