From 62ef3c06af1a2f0369002cbaeba068026becaa89 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Wed, 9 Jul 2025 14:16:43 -0700 Subject: [PATCH 01/17] added support for AWS temp creds being set with AWS configure --- .../next-release/configure-tempcreds.json | 6 ++ awscli/customizations/configure/configure.py | 48 +++++++++- awscli/customizations/configure/writer.py | 36 ++++---- .../configure/test_configure.py | 90 +++++++++++++++++++ 4 files changed, 159 insertions(+), 21 deletions(-) create mode 100644 .changes/next-release/configure-tempcreds.json diff --git a/.changes/next-release/configure-tempcreds.json b/.changes/next-release/configure-tempcreds.json new file mode 100644 index 000000000000..735dc45c7750 --- /dev/null +++ b/.changes/next-release/configure-tempcreds.json @@ -0,0 +1,6 @@ + +{ + "type": "enhancement", + "category": "``Configure``", + "description": "Added Support for AWS temp creds with AWS configure, e.g. CLI will ask for aws_session_token if the access key id is temporary" +} \ No newline at end of file diff --git a/awscli/customizations/configure/configure.py b/awscli/customizations/configure/configure.py index 93eb5e53a81f..1cc827b23f5a 100644 --- a/awscli/customizations/configure/configure.py +++ b/awscli/customizations/configure/configure.py @@ -43,9 +43,13 @@ def register_configure_cmd(cli): class InteractivePrompter: def get_value(self, current_value, config_name, prompt_text=''): - if config_name in ('aws_access_key_id', 'aws_secret_access_key'): + if config_name in ( + 'aws_access_key_id', + 'aws_secret_access_key', + 'aws_session_token', + ): current_value = mask_value(current_value) - response = compat_input("%s [%s]: " % (prompt_text, current_value)) + response = compat_input(f"{prompt_text} [{current_value}]: ") if not response: # If the user hits enter, we return a value of None # instead of an empty string. That way we can determine @@ -100,7 +104,7 @@ class ConfigureCommand(BasicCommand): ] def __init__(self, session, prompter=None, config_writer=None): - super(ConfigureCommand, self).__init__(session) + super().__init__(session) if prompter is None: prompter = InteractivePrompter() self._prompter = prompter @@ -117,6 +121,10 @@ def _run_main(self, parsed_args, parsed_globals): config = self._session.get_scoped_config() except ProfileNotFound: config = {} + + # Track if we need to prompt for session token + needs_session_token = False + for config_name, prompt_text in self.VALUES_TO_PROMPT: current_value = config.get(config_name) new_value = self._prompter.get_value( @@ -124,6 +132,36 @@ def _run_main(self, parsed_args, parsed_globals): ) if new_value is not None and new_value != current_value: new_values[config_name] = new_value + + # Check if this is a temporary credential (starts with ASIA) + if ( + config_name == 'aws_access_key_id' + and new_value + and new_value.startswith('ASIA') + ): + needs_session_token = True + + # Prompt for session token after secret key but before region + if config_name == 'aws_secret_access_key' and needs_session_token: + session_token_current = config.get('aws_session_token') + session_token_new = self._prompter.get_value( + session_token_current, + 'aws_session_token', + 'AWS Session Token', + ) + if ( + session_token_new is not None + and session_token_new != session_token_current + ): + new_values['aws_session_token'] = session_token_new + + # Remove session token for non-temporary credentials + if ( + 'aws_access_key_id' in new_values + and not needs_session_token + and config.get('aws_session_token') + ): + new_values['aws_session_token'] = None config_filename = os.path.expanduser( self._session.get_config_variable('config_file') ) @@ -150,6 +188,10 @@ def _write_out_creds_file_values(self, new_values, profile_name): credential_file_values['aws_secret_access_key'] = new_values.pop( 'aws_secret_access_key' ) + if 'aws_session_token' in new_values: + credential_file_values['aws_session_token'] = new_values.pop( + 'aws_session_token' + ) if credential_file_values: if profile_name is not None: credential_file_values['__section__'] = profile_name diff --git a/awscli/customizations/configure/writer.py b/awscli/customizations/configure/writer.py index d4daa9dea35a..8a173ab68673 100644 --- a/awscli/customizations/configure/writer.py +++ b/awscli/customizations/configure/writer.py @@ -93,7 +93,7 @@ def _write_new_section(self, section_name, new_values, config_filename): with open(config_filename, 'a') as f: if needs_newline: f.write('\n') - f.write('[%s]\n' % section_name) + f.write(f'[{section_name}]\n') contents = [] self._insert_new_values( line_number=0, contents=contents, new_values=new_values @@ -148,8 +148,12 @@ def _update_section_contents(self, contents, section_name, new_values): # out now. if not isinstance(new_values[key_name], dict): option_value = new_values[key_name] - new_line = '%s = %s\n' % (key_name, option_value) - contents[j] = new_line + if option_value is None: + # Remove the line by replacing with empty string + contents[j] = '' + else: + new_line = f'{key_name} = {option_value}\n' + contents[j] = new_line del new_values[key_name] else: j = self._update_subattributes( @@ -182,10 +186,8 @@ def _update_subattributes(self, index, contents, values, starting_indent): key_name = match.group(1).strip() if key_name in values: option_value = values[key_name] - new_line = '%s%s = %s\n' % ( - ' ' * current_indent, - key_name, - option_value, + new_line = ( + f"{' ' * current_indent}{key_name} = {option_value}\n" ) contents[i] = new_line del values[key_name] @@ -208,23 +210,21 @@ def _insert_new_values(self, line_number, contents, new_values, indent=''): for key, value in list(new_values.items()): if isinstance(value, dict): subindent = indent + ' ' - new_contents.append('%s%s =\n' % (indent, key)) + new_contents.append(f'{indent}{key} =\n') for subkey, subval in list(value.items()): - new_contents.append( - '%s%s = %s\n' % (subindent, subkey, subval) - ) - else: - new_contents.append('%s%s = %s\n' % (indent, key, value)) + new_contents.append(f'{subindent}{subkey} = {subval}\n') + elif value is not None: + new_contents.append(f'{indent}{key} = {value}\n') del new_values[key] - contents.insert(line_number + 1, ''.join(new_contents)) + if new_contents: + contents.insert(line_number + 1, ''.join(new_contents)) def _matches_section(self, match, section_name): parts = section_name.split(' ') - unquoted_match = match.group(0) == '[%s]' % section_name + unquoted_match = match.group(0) == f'[{section_name}]' if len(parts) > 1: - quoted_match = match.group(0) == '[%s "%s"]' % ( - parts[0], - ' '.join(parts[1:]), + quoted_match = ( + match.group(0) == f'[{parts[0]} "{" ".join(parts[1:])}"]' ) return unquoted_match or quoted_match return unquoted_match diff --git a/tests/unit/customizations/configure/test_configure.py b/tests/unit/customizations/configure/test_configure.py index 386de2be0528..d68c33c81385 100644 --- a/tests/unit/customizations/configure/test_configure.py +++ b/tests/unit/customizations/configure/test_configure.py @@ -154,6 +154,84 @@ def test_session_says_profile_does_not_exist(self): 'myconfigfile', ) + def test_temporary_credentials_prompts_for_session_token(self): + # When user enters temporary credentials (starting with ASIA), + # should prompt for session token after secret key + responses = { + "AWS Access Key ID": "ASIATEMP123456789", + "AWS Secret Access Key": "secret123", + "AWS Session Token": "session_token_123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + self.session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should write all three credential values to credentials file + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'ASIATEMP123456789', + 'aws_secret_access_key': 'secret123', + 'aws_session_token': 'session_token_123', + } + ) + + # Non-credentials config is written to the config file + self.writer.update_config.assert_called_with( + {'region': 'us-west-2', 'output': 'json'}, 'myconfigfile' + ) + + def test_regular_credentials_no_session_token_prompt(self): + # When user enters regular credentials (not starting with ASIA), + # should NOT prompt for session token + responses = { + "AWS Access Key ID": "AKIAREGULAR123456", + "AWS Secret Access Key": "secret123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + self.session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should only write access key and secret key (no session token) + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'AKIAREGULAR123456', + 'aws_secret_access_key': 'secret123', + } + ) + + def test_iam_user_credentials_remove_session_token(self): + # When configuring IAM user credentials (AKIA), existing session token should be removed + session = FakeSession({'config_file': 'myconfigfile'}) + session.config = {'aws_session_token': 'existing_token'} + responses = { + "AWS Access Key ID": "AKIAUSER123456789", + "AWS Secret Access Key": "secret123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should write credentials and remove session token (set to None) + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'AKIAUSER123456789', + 'aws_secret_access_key': 'secret123', + 'aws_session_token': None, + } + ) + class TestInteractivePrompter(unittest.TestCase): def setUp(self): @@ -219,6 +297,18 @@ def test_non_secret_keys_are_not_masked(self): self.assertIn('mycurrentvalue', prompt_text) self.assertRegex(prompt_text, r'\[mycurrentvalue\]') + def test_session_token_is_masked(self): + prompter = configure.InteractivePrompter() + prompter.get_value( + current_value='mysessiontoken123', + config_name='aws_session_token', + prompt_text='Session Token', + ) + # Session token should be masked like other credentials + prompt_text = self.stdout.getvalue() + self.assertNotIn('mysessiontoken123', prompt_text) + self.assertRegex(prompt_text, r'\[\*\*\*\*.*\]') + def test_user_hits_enter_returns_none(self): # If a user hits enter, then raw_input returns the empty string. self.mock_raw_input.return_value = '' From b2fbf50408e0c9ab2dec55abcdfa828400d81233 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Thu, 10 Jul 2025 13:43:19 -0700 Subject: [PATCH 02/17] adding configure mfa-login --- .../next-release/configure-tempcreds.json | 10 +- awscli/customizations/configure/configure.py | 13 + awscli/customizations/configure/mfalogin.py | 400 +++++++++++++ awscli/examples/configure/mfa-login.rst | 49 ++ .../configure/mfa-login/_description.rst | 3 + .../configure/mfa-login/_examples.rst | 47 ++ .../customizations/configure/test_mfalogin.py | 539 ++++++++++++++++++ 7 files changed, 1059 insertions(+), 2 deletions(-) create mode 100644 awscli/customizations/configure/mfalogin.py create mode 100644 awscli/examples/configure/mfa-login.rst create mode 100644 awscli/examples/configure/mfa-login/_description.rst create mode 100644 awscli/examples/configure/mfa-login/_examples.rst create mode 100644 tests/unit/customizations/configure/test_mfalogin.py diff --git a/.changes/next-release/configure-tempcreds.json b/.changes/next-release/configure-tempcreds.json index 735dc45c7750..c3c6ce23acec 100644 --- a/.changes/next-release/configure-tempcreds.json +++ b/.changes/next-release/configure-tempcreds.json @@ -1,6 +1,12 @@ - +[] { "type": "enhancement", "category": "``Configure``", "description": "Added Support for AWS temp creds with AWS configure, e.g. CLI will ask for aws_session_token if the access key id is temporary" -} \ No newline at end of file +}, +{ + "type": "enhancement", + "category": "``Configure``", + "description": "Added new sub-command to aws confugre, `mfa-login`, which is an interactive wrapper for the aws sts get-session-token command" +}, +] \ No newline at end of file diff --git a/awscli/customizations/configure/configure.py b/awscli/customizations/configure/configure.py index 1cc827b23f5a..15bdbbd2a053 100644 --- a/awscli/customizations/configure/configure.py +++ b/awscli/customizations/configure/configure.py @@ -12,6 +12,7 @@ # language governing permissions and limitations under the License. import logging import os +import sys from botocore.exceptions import ProfileNotFound @@ -25,6 +26,7 @@ from awscli.customizations.configure.importer import ConfigureImportCommand from awscli.customizations.configure.list import ConfigureListCommand from awscli.customizations.configure.listprofiles import ListProfilesCommand +from awscli.customizations.configure.mfalogin import ConfigureMFALoginCommand from awscli.customizations.configure.set import ConfigureSetCommand from awscli.customizations.configure.sso import ( ConfigureSSOCommand, @@ -88,6 +90,7 @@ class ConfigureCommand(BasicCommand): {'name': 'list-profiles', 'command_class': ListProfilesCommand}, {'name': 'sso', 'command_class': ConfigureSSOCommand}, {'name': 'sso-session', 'command_class': ConfigureSSOSessionCommand}, + {'name': 'mfa-login', 'command_class': ConfigureMFALoginCommand}, { 'name': 'export-credentials', 'command_class': ConfigureExportCredentialsCommand, @@ -114,6 +117,16 @@ def __init__(self, session, prompter=None, config_writer=None): def _run_main(self, parsed_args, parsed_globals): # Called when invoked with no args "aws configure" + # Check if there are any remaining unparsed arguments that might be invalid subcommands + if hasattr(parsed_args, 'remaining') and parsed_args.remaining: + sys.stderr.write( + f"Invalid subcommand: {' '.join(parsed_args.remaining)}\n" + ) + sys.stderr.write( + "Valid subcommands are: list, get, set, add-model, import, list-profiles, sso, sso-session, mfa-login, export-credentials\n" + ) + return 1 + new_values = {} # This is the config from the config file scoped to a specific # profile. diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py new file mode 100644 index 000000000000..ebb43ee3df21 --- /dev/null +++ b/awscli/customizations/configure/mfalogin.py @@ -0,0 +1,400 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import logging +import os +import random +import string +import sys + +# Import botocore at module level to avoid repeated imports +import botocore.session +from botocore.exceptions import ClientError, ProfileNotFound + +from awscli.compat import compat_input +from awscli.customizations.commands import BasicCommand +from awscli.customizations.configure import profile_to_section +from awscli.customizations.configure.writer import ConfigFileWriter + +LOG = logging.getLogger(__name__) + + +class InteractiveMFAPrompter: + """Handles interactive prompting for MFA login.""" + + def get_value(self, current_value, prompt_text=''): + """Prompt for a value, showing the current value as a default.""" + response = compat_input(f"{prompt_text} [{current_value}]: ") + if not response: + # If the user hits enter, return the current value + return current_value + return response + + def get_credential_value(self, current_value, config_name, prompt_text=''): + """Prompt for credential values with masking for sensitive data.""" + response = compat_input(f"{prompt_text}: ") + if not response: + return None + return response + + +class ConfigureMFALoginCommand(BasicCommand): + """Configures MFA login for AWS CLI by creating temporary credentials.""" + + NAME = 'mfa-login' + DESCRIPTION = ( + 'Sets up temporary credentials for MFA authentication. ' + 'This command creates a new profile with temporary credentials ' + 'obtained using the AWS STS get-session-token API.' + ) + SYNOPSIS = ( + 'aws configure mfa-login [--profile profile-name] ' + '[--update-profile profile-to-update] [--duration-seconds seconds] ' + '[--serial-number mfa-serial-number]' + ) + EXAMPLES = ( + 'To create a new profile with temporary credentials::\n' + '\n' + ' $ aws configure mfa-login\n' + ' MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user\n' + ' MFA token code: 123456\n' + ' Profile to update [session-12345]:\n' + '\n' + 'To update an existing profile with temporary credentials::\n' + '\n' + ' $ aws configure mfa-login --profile myprofile --update-profile mytemp\n' + ' MFA token code: 123456\n' + '\n' + 'To specify the MFA device serial number or ARN directly::\n' + '\n' + ' $ aws configure mfa-login --serial-number arn:aws:iam::123456789012:mfa/user\n' + ' MFA token code: 123456\n' + ' Profile to update [session-12345]:\n' + ) + ARG_TABLE = [ + { + 'name': 'profile', + 'help_text': ('Use a specific profile from your credential file.'), + 'action': 'store', + 'required': False, + 'cli_type_name': 'string', + }, + { + 'name': 'update-profile', + 'help_text': ( + 'The profile to update with temporary credentials. ' + 'If not provided, a default name will be generated.' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'string', + }, + { + 'name': 'duration-seconds', + 'help_text': ( + 'The duration, in seconds, that the credentials should remain valid. ' + 'Default is 43200 seconds (12 hours).' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'integer', + }, + { + 'name': 'serial-number', + 'help_text': ( + 'The ARN or serial number of the MFA device associated with the IAM user. ' + 'If not provided, will use the mfa_serial from the profile configuration.' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'string', + }, + ] + + def __init__(self, session, prompter=None, config_writer=None): + super().__init__(session) + if prompter is None: + prompter = InteractiveMFAPrompter() + self._prompter = prompter + if config_writer is None: + config_writer = ConfigFileWriter() + self._config_writer = config_writer + + def _check_profile_exists(self, profile_name): + """Check if a profile exists using botocore's native profile handling.""" + session = botocore.session.Session() + return profile_name in session.available_profiles + + def _run_main(self, parsed_args, parsed_globals): + # Get the source profile for credentials + source_profile = parsed_globals.profile or 'default' + + # Get the target profile to update + target_profile = parsed_args.update_profile + + # Get duration seconds + duration_seconds = ( + parsed_args.duration_seconds or 43200 + ) # Default 12 hours + + # Import here to avoid circular imports + + # Create a new session with the specified profile + try: + # Use botocore's native profile handling + session = botocore.session.Session(profile=source_profile) + + # Check if profile exists + if ( + source_profile not in session.available_profiles + and source_profile != 'default' + ): + sys.stderr.write( + f"The profile ({source_profile}) could not be found. \n" + ) + return 1 + + # Get credentials + credentials = session.get_credentials() + if credentials is None: + # If no credentials found and using default profile and running interactively, prompt for them + if source_profile == 'default' and sys.stdin.isatty(): + return self._handle_missing_default_profile( + parsed_args, duration_seconds + ) + else: + sys.stderr.write( + f"Unable to locate credentials for profile {source_profile}\n" + ) + return 1 + + source_config = session.get_scoped_config() + except ProfileNotFound: + # If default profile not found and running interactively, prompt for credentials + if source_profile == 'default' and sys.stdin.isatty(): + return self._handle_missing_default_profile( + parsed_args, duration_seconds + ) + else: + sys.stderr.write( + f"The profile ({source_profile}) could not be found. \n" + ) + return 1 + except Exception as e: + sys.stderr.write( + f"Error accessing profile {source_profile}: {str(e)}\n" + ) + return 1 + + # Get MFA serial number + mfa_serial = parsed_args.serial_number or source_config.get( + 'mfa_serial' + ) + if not mfa_serial: + if sys.stdin.isatty(): + mfa_serial = self._prompter.get_credential_value( + 'None', 'mfa_serial', 'MFA serial number or ARN' + ) + if not mfa_serial: + sys.stderr.write("MFA serial number or ARN is required\n") + return 1 + else: + sys.stderr.write("MFA serial number or ARN is required\n") + return 1 + + # Get MFA token code + if sys.stdin.isatty(): + token_code = self._prompter.get_credential_value( + 'None', 'mfa_token', 'MFA token code' + ) + if not token_code: + sys.stderr.write("MFA token code is required\n") + return 1 + else: + sys.stderr.write("MFA token code is required\n") + return 1 + + # If no target profile is specified, generate a default name + if not target_profile: + random_suffix = ''.join( + random.choices(string.ascii_lowercase + string.digits, k=5) + ) + target_profile = f"session-{random_suffix}" + if sys.stdin.isatty(): + target_profile = self._prompter.get_value( + target_profile, 'Profile to update' + ) + + # Call STS to get temporary credentials + try: + sts_client = session.create_client('sts') + response = sts_client.get_session_token( + DurationSeconds=duration_seconds, + SerialNumber=mfa_serial, + TokenCode=token_code, + ) + except ClientError as e: + sys.stderr.write(f"An error occurred: {e}\n") + return 1 + + # Extract credentials from response + temp_credentials = response['Credentials'] + + # Write credentials to the credentials file + credentials_file = os.path.expanduser('~/.aws/credentials') + + # Prepare the values to write + credential_values = { + '__section__': target_profile, + 'aws_access_key_id': temp_credentials['AccessKeyId'], + 'aws_secret_access_key': temp_credentials['SecretAccessKey'], + 'aws_session_token': temp_credentials['SessionToken'], + } + + # Get expiration time as a string + try: + expiration_time = temp_credentials['Expiration'].strftime( + '%Y-%m-%d %H:%M:%S UTC' + ) + except Exception: + # Handle case where Expiration might not be a datetime object + expiration_time = str(temp_credentials['Expiration']) + + # Add expiration as a comment in the config file + credential_values['#expiration'] = ( + f"# Credentials expire at: {expiration_time}" + ) + + # Write to credentials file + self._config_writer.update_config(credential_values, credentials_file) + + sys.stdout.write( + f"Temporary credentials written to profile '{target_profile}'\n" + ) + sys.stdout.write(f"Credentials will expire at {expiration_time}\n") + sys.stdout.write( + f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" + ) + + return 0 + + def _handle_missing_default_profile(self, parsed_args, duration_seconds): + """Handle the case where no default profile exists by prompting for credentials.""" + sys.stdout.write( + "No default profile found. Please provide your AWS credentials:\n" + ) + + # Prompt for access key ID + access_key = self._prompter.get_credential_value( + 'None', 'aws_access_key_id', 'AWS Access Key ID' + ) + if not access_key or access_key == 'None': + sys.stderr.write("AWS Access Key ID is required\n") + return 1 + + # Prompt for secret access key + secret_key = self._prompter.get_credential_value( + 'None', 'aws_secret_access_key', 'AWS Secret Access Key' + ) + if not secret_key or secret_key == 'None': + sys.stderr.write("AWS Secret Access Key is required\n") + return 1 + + # Get MFA serial number + mfa_serial = parsed_args.serial_number + if not mfa_serial: + mfa_serial = self._prompter.get_credential_value( + 'None', 'mfa_serial', 'MFA serial number or ARN' + ) + if not mfa_serial: + sys.stderr.write("MFA serial number or ARN is required\n") + return 1 + + # Get MFA token code + token_code = self._prompter.get_credential_value( + 'None', 'mfa_token', 'MFA token code' + ) + if not token_code: + sys.stderr.write("MFA token code is required\n") + return 1 + + # Generate target profile name if not specified + target_profile = parsed_args.update_profile + if not target_profile: + random_suffix = ''.join( + random.choices(string.ascii_lowercase + string.digits, k=5) + ) + target_profile = f"session-{random_suffix}" + target_profile = self._prompter.get_value( + target_profile, 'Profile to update' + ) + + # Create a temporary session with the provided credentials + session = botocore.session.Session() + + try: + # Create STS client with the provided credentials + sts_client = session.create_client( + 'sts', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # Call STS to get temporary credentials + response = sts_client.get_session_token( + DurationSeconds=duration_seconds, + SerialNumber=mfa_serial, + TokenCode=token_code, + ) + except ClientError as e: + sys.stderr.write(f"An error occurred: {e}\n") + return 1 + + # Extract credentials from response + temp_credentials = response['Credentials'] + + # Write credentials to the credentials file + credentials_file = os.path.expanduser('~/.aws/credentials') + + # Prepare the values to write + credential_values = { + '__section__': target_profile, + 'aws_access_key_id': temp_credentials['AccessKeyId'], + 'aws_secret_access_key': temp_credentials['SecretAccessKey'], + 'aws_session_token': temp_credentials['SessionToken'], + } + + # Get expiration time as a string + try: + expiration_time = temp_credentials['Expiration'].strftime( + '%Y-%m-%d %H:%M:%S UTC' + ) + except Exception: + expiration_time = str(temp_credentials['Expiration']) + + # Add expiration as a comment in the config file + credential_values['#expiration'] = ( + f"# Credentials expire at: {expiration_time}" + ) + + # Write to credentials file + self._config_writer.update_config(credential_values, credentials_file) + + sys.stdout.write( + f"Temporary credentials written to profile '{target_profile}'\n" + ) + sys.stdout.write(f"Credentials will expire at {expiration_time}\n") + sys.stdout.write( + f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" + ) + + return 0 diff --git a/awscli/examples/configure/mfa-login.rst b/awscli/examples/configure/mfa-login.rst new file mode 100644 index 000000000000..5c21a165c7f2 --- /dev/null +++ b/awscli/examples/configure/mfa-login.rst @@ -0,0 +1,49 @@ +**To create a new profile with temporary MFA credentials** + +The following ``mfa-login`` command creates a new profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login + +Output:: + + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**To create credentials when no default profile exists** + +If you don't have a default profile configured, the ``mfa-login`` command will prompt you for your AWS credentials first. :: + + aws configure mfa-login + +Output:: + + No default profile found. Please provide your AWS credentials: + AWS Access Key ID: AKIAIOSFODNN7EXAMPLE + AWS Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**To update an existing profile with temporary MFA credentials** + +The following ``mfa-login`` command updates an existing profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login --profile myprofile --update-profile mytemp + +Output:: + + MFA token code: 123456 + Temporary credentials written to profile 'mytemp' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile mytemp when running AWS CLI commands + +**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. + +For more information, see `Using Multi-Factor Authentication (MFA) in AWS `__ in the *AWS IAM User Guide*. \ No newline at end of file diff --git a/awscli/examples/configure/mfa-login/_description.rst b/awscli/examples/configure/mfa-login/_description.rst new file mode 100644 index 000000000000..e028526c9c10 --- /dev/null +++ b/awscli/examples/configure/mfa-login/_description.rst @@ -0,0 +1,3 @@ +This command gets temporary AWS security credentials for use with the AWS CLI and SDK, and places them in an AWS profile. It will use a long-lived IAM user access key, and the MFA code from either a virtual TOTP MFA device, or a hardware OTP authenticator to call STS get-session-token to get the temporary credentials. If no default profile exists, the command will prompt you to provide your AWS credentials first. + +Note: This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. \ No newline at end of file diff --git a/awscli/examples/configure/mfa-login/_examples.rst b/awscli/examples/configure/mfa-login/_examples.rst new file mode 100644 index 000000000000..6643b6005316 --- /dev/null +++ b/awscli/examples/configure/mfa-login/_examples.rst @@ -0,0 +1,47 @@ +**To create a new profile with temporary MFA credentials** + +The following ``mfa-login`` command creates a new profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login + +Output:: + + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**To update an existing profile with temporary MFA credentials** + +The following ``mfa-login`` command updates an existing profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login --profile myprofile --update-profile mytemp + +Output:: + + MFA token code: 123456 + Temporary credentials written to profile 'mytemp' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile mytemp when running AWS CLI commands + +**To create credentials when no default profile exists** + +If you don't have a default profile configured, the ``mfa-login`` command will prompt you for your AWS credentials first. :: + + aws configure mfa-login + +Output:: + + No default profile found. Please provide your AWS credentials: + AWS Access Key ID: AKIAIOSFODNN7EXAMPLE + AWS Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. \ No newline at end of file diff --git a/tests/unit/customizations/configure/test_mfalogin.py b/tests/unit/customizations/configure/test_mfalogin.py new file mode 100644 index 000000000000..972f90b6e28b --- /dev/null +++ b/tests/unit/customizations/configure/test_mfalogin.py @@ -0,0 +1,539 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import datetime +import os +from unittest import mock + +import botocore.session +from botocore.exceptions import ClientError, ProfileNotFound + +from awscli.customizations.configure.mfalogin import ( + ConfigureMFALoginCommand, + InteractiveMFAPrompter, +) +from awscli.testutils import unittest + + +class TestInteractiveMFAPrompter(unittest.TestCase): + def test_get_value_with_response(self): + prompter = InteractiveMFAPrompter() + # Mock the entire compat_input function, not just the return value + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = 'response' + self.assertEqual( + prompter.get_value('current', 'prompt'), 'response' + ) + + def test_get_value_with_no_response(self): + prompter = InteractiveMFAPrompter() + # Mock the entire compat_input function, not just the return value + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = '' + self.assertEqual( + prompter.get_value('current', 'prompt'), 'current' + ) + + +class TestConfigureMFALoginCommand(unittest.TestCase): + def setUp(self): + self.session = mock.Mock() + self.session.get_scoped_config.return_value = {} + self.session.get_credentials.return_value = mock.Mock() + # Add available_profiles to the session mock + self.session.available_profiles = ['default', 'test'] + self.prompter = mock.Mock() + self.config_writer = mock.Mock() + self.command = ConfigureMFALoginCommand( + self.session, + prompter=self.prompter, + config_writer=self.config_writer, + ) + self.parsed_args = mock.Mock() + self.parsed_args.profile = None + self.parsed_args.update_profile = None + self.parsed_args.duration_seconds = None + self.parsed_args.serial_number = None + self.parsed_globals = mock.Mock() + # Set profile in parsed_globals + self.parsed_globals.profile = 'default' + + def test_no_credentials_found(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = None + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "Unable to locate credentials for profile default\n" + ) + + def test_profile_not_found(self): + # Set profile to a non-existent profile + self.parsed_globals.profile = 'nonexistent' + + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.available_profiles = ['default', 'test'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "The profile (nonexistent) could not be found. \n" + ) + + def test_no_mfa_serial_provided(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_value.return_value = 'None' + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA serial number or ARN is required\n" + ) + + def test_no_token_code_provided(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + 'None', + ] + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA token code is required\n" + ) + + def test_sts_client_error(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + sts_client = mock.Mock() + sts_client.get_session_token.side_effect = ClientError( + { + 'Error': { + 'Code': 'InvalidClientTokenId', + 'Message': 'Test error', + } + }, + 'GetSessionToken', + ) + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + self.prompter.get_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + '123456', + 'session-test', + ] + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + mock.ANY + ) # Just check it was called + + def test_successful_mfa_login(self): + # Setup + self.prompter.get_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + '123456', + 'session-test', + ] + + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', + 'Expiration': expiration, + } + } + + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + sts_client = mock.Mock() + sts_client.get_session_token.return_value = sts_response + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + # Verify + self.assertEqual(rc, 0) + + # Check STS was called correctly + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user', + TokenCode='123456', + ) + + # Check config writer was called correctly + expected_values = { + '__section__': 'session-test', + 'aws_access_key_id': 'ASIAIOSFODNN7EXAMPLE', + 'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'aws_session_token': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', + '#expiration': f"# Credentials expire at: {expiration.strftime('%Y-%m-%d %H:%M:%S UTC')}", + } + + self.config_writer.update_config.assert_called_with( + expected_values, '/tmp/credentials' + ) + + def test_serial_number_from_parameter(self): + # Setup - use serial number from parameter + self.parsed_args.serial_number = ( + 'arn:aws:iam::123456789012:mfa/user-param' + ) + + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = { + 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user-config' + } + mock_session.available_profiles = ['default'] + + sts_client = mock.Mock() + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': expiration, + } + } + sts_client.get_session_token.return_value = sts_response + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_value.side_effect = [ + '123456', + 'session-test', + ] + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + # Verify + self.assertEqual(rc, 0) + + # Check that the parameter value was used instead of the config value + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user-param', + TokenCode='123456', + ) + + def test_missing_default_profile_interactive(self): + """Test prompting for credentials when no default profile exists in interactive mode.""" + self.parsed_globals.profile = None # Use default profile + + # Mock botocore.session.Session to raise ProfileNotFound + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_class.side_effect = ProfileNotFound(profile='default') + + # Mock sys.stdin.isatty to return True (interactive) + with mock.patch('sys.stdin.isatty', return_value=True): + # Mock the _handle_missing_default_profile method + with mock.patch.object( + self.command, + '_handle_missing_default_profile', + return_value=0, + ) as mock_handle: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 0) + mock_handle.assert_called_once_with( + self.parsed_args, 43200 + ) + + def test_missing_default_profile_non_interactive(self): + """Test error when no default profile exists in non-interactive mode.""" + self.parsed_globals.profile = None # Use default profile + + # Mock botocore.session.Session to raise ProfileNotFound + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_class.side_effect = ProfileNotFound(profile='default') + + # Mock sys.stdin.isatty to return False (non-interactive) + with mock.patch('sys.stdin.isatty', return_value=False): + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "The profile (default) could not be found. \n" + ) + + def test_handle_missing_default_profile_success(self): + """Test successful credential prompting and MFA login when no default profile exists.""" + # Setup mock responses for prompting + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + ] + self.prompter.get_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + 'session-test', # profile name + ] + + # Mock STS response + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': expiration, + } + } + + # Mock botocore session and STS client + mock_session = mock.Mock() + sts_client = mock.Mock() + sts_client.get_session_token.return_value = sts_response + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._handle_missing_default_profile( + self.parsed_args, 43200 + ) + + # Verify success + self.assertEqual(rc, 0) + + # Verify STS call + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user', + TokenCode='123456', + ) + + # Verify only the session profile was written + self.assertEqual(self.config_writer.update_config.call_count, 1) + + def test_handle_missing_default_profile_missing_access_key(self): + """Test error when access key is not provided.""" + self.prompter.get_credential_value.return_value = ( + None # No access key provided + ) + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_missing_default_profile( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Access Key ID is required\n" + ) + + def test_handle_missing_default_profile_missing_secret_key(self): + """Test error when secret key is not provided.""" + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key provided + None, # secret key not provided + ] + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_missing_default_profile( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Secret Access Key is required\n" + ) + + def test_credential_value_prompting_clean_display(self): + """Test that credential prompting doesn't show default values.""" + prompter = InteractiveMFAPrompter() + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = 'test-value' + result = prompter.get_credential_value( + 'None', 'aws_access_key_id', 'AWS Access Key ID' + ) + + # Verify the prompt doesn't show [None] or any default value + mock_input.assert_called_with('AWS Access Key ID: ') + self.assertEqual(result, 'test-value') + + def test_handle_missing_default_profile_sts_error(self): + """Test STS error handling in missing default profile scenario.""" + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + ] + self.prompter.get_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + 'session-test', # profile name + ] + + # Mock STS client to raise an error + mock_session = mock.Mock() + sts_client = mock.Mock() + sts_client.get_session_token.side_effect = ClientError( + { + 'Error': { + 'Code': 'InvalidClientTokenId', + 'Message': 'Invalid credentials', + } + }, + 'GetSessionToken', + ) + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_missing_default_profile( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + # Verify error message was written + mock_stderr.write.assert_called() + self.assertIn( + 'An error occurred', str(mock_stderr.write.call_args) + ) + + def test_non_interactive_missing_mfa_serial(self): + """Test non-interactive mode when MFA serial is missing.""" + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} # No mfa_serial in config + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA serial number or ARN is required\n" + ) + + def test_non_interactive_missing_token_code(self): + """Test non-interactive mode when token code would be prompted.""" + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = { + 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user' + } + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA token code is required\n" + ) + + def test_empty_credential_input_handling(self): + """Test handling of empty credential inputs.""" + self.prompter.get_credential_value.return_value = '' # Empty string + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_missing_default_profile( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Access Key ID is required\n" + ) From fb8ae692af1eaf36a665dc83929f537b7f1c0c6b Mon Sep 17 00:00:00 2001 From: Liam <101819487+liwadman@users.noreply.github.com> Date: Tue, 19 Aug 2025 11:29:43 -0700 Subject: [PATCH 03/17] Update .changes/next-release/configure-tempcreds.json Co-authored-by: Alex Shovlin --- .changes/next-release/configure-tempcreds.json | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.changes/next-release/configure-tempcreds.json b/.changes/next-release/configure-tempcreds.json index c3c6ce23acec..41f92b036cd6 100644 --- a/.changes/next-release/configure-tempcreds.json +++ b/.changes/next-release/configure-tempcreds.json @@ -1,12 +1,11 @@ -[] { "type": "enhancement", - "category": "``Configure``", - "description": "Added Support for AWS temp creds with AWS configure, e.g. CLI will ask for aws_session_token if the access key id is temporary" + "category": "``configure``", + "description": "Added support for temporary credentials with ``aws configure``. The CLI will prompt for an ``aws_session_token`` if the provided access key ID is temporary." }, { "type": "enhancement", - "category": "``Configure``", - "description": "Added new sub-command to aws confugre, `mfa-login`, which is an interactive wrapper for the aws sts get-session-token command" + "category": "``configure``", + "description": "Added the new ``aws configure mfa-login` command, which creates a profile with temporary credentials corresponding to an IAM user with an MFA code." }, ] \ No newline at end of file From 6afe5ef602b2c32703b16abe991350bf72abd4fc Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 11:29:30 -0700 Subject: [PATCH 04/17] removed unneeded logic, simplified logic for aws_session_token management --- awscli/customizations/configure/configure.py | 46 +++++--------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/awscli/customizations/configure/configure.py b/awscli/customizations/configure/configure.py index 15bdbbd2a053..bcf1e6936a85 100644 --- a/awscli/customizations/configure/configure.py +++ b/awscli/customizations/configure/configure.py @@ -102,6 +102,7 @@ class ConfigureCommand(BasicCommand): # (logical_name, config_name, prompt_text) ('aws_access_key_id', "AWS Access Key ID"), ('aws_secret_access_key', "AWS Secret Access Key"), + ('aws_session_token', "AWS Session Token"), ('region', "Default region name"), ('output', "Default output format"), ] @@ -115,17 +116,13 @@ def __init__(self, session, prompter=None, config_writer=None): config_writer = ConfigFileWriter() self._config_writer = config_writer + def _needs_session_token(self, new_values): + """Check if session token is needed based on access key ID.""" + access_key = new_values.get('aws_access_key_id') + return access_key and access_key.startswith('ASIA') + def _run_main(self, parsed_args, parsed_globals): # Called when invoked with no args "aws configure" - # Check if there are any remaining unparsed arguments that might be invalid subcommands - if hasattr(parsed_args, 'remaining') and parsed_args.remaining: - sys.stderr.write( - f"Invalid subcommand: {' '.join(parsed_args.remaining)}\n" - ) - sys.stderr.write( - "Valid subcommands are: list, get, set, add-model, import, list-profiles, sso, sso-session, mfa-login, export-credentials\n" - ) - return 1 new_values = {} # This is the config from the config file scoped to a specific @@ -135,10 +132,11 @@ def _run_main(self, parsed_args, parsed_globals): except ProfileNotFound: config = {} - # Track if we need to prompt for session token - needs_session_token = False - for config_name, prompt_text in self.VALUES_TO_PROMPT: + # Skip session token if not needed + if config_name == 'aws_session_token' and not self._needs_session_token(new_values): + continue + current_value = config.get(config_name) new_value = self._prompter.get_value( current_value, config_name, prompt_text @@ -146,32 +144,10 @@ def _run_main(self, parsed_args, parsed_globals): if new_value is not None and new_value != current_value: new_values[config_name] = new_value - # Check if this is a temporary credential (starts with ASIA) - if ( - config_name == 'aws_access_key_id' - and new_value - and new_value.startswith('ASIA') - ): - needs_session_token = True - - # Prompt for session token after secret key but before region - if config_name == 'aws_secret_access_key' and needs_session_token: - session_token_current = config.get('aws_session_token') - session_token_new = self._prompter.get_value( - session_token_current, - 'aws_session_token', - 'AWS Session Token', - ) - if ( - session_token_new is not None - and session_token_new != session_token_current - ): - new_values['aws_session_token'] = session_token_new - # Remove session token for non-temporary credentials if ( 'aws_access_key_id' in new_values - and not needs_session_token + and not self._needs_session_token(new_values) and config.get('aws_session_token') ): new_values['aws_session_token'] = None From 505da651f9877b2b53de3450d798b97d607ea8fc Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:05:34 -0700 Subject: [PATCH 05/17] deterministic profile name, remove duplicative code --- awscli/customizations/configure/mfalogin.py | 55 +++++++++++---------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index ebb43ee3df21..f58f5d2f14cf 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -134,20 +134,41 @@ def _check_profile_exists(self, profile_name): session = botocore.session.Session() return profile_name in session.available_profiles + def _generate_profile_name_from_mfa(self, mfa_serial): + """Generate a deterministic profile name from MFA serial/ARN.""" + if mfa_serial.startswith('arn:aws:iam::'): + # Parse ARN: arn:aws:iam::123456789012:mfa/device-name + parts = mfa_serial.split(':') + account_id = parts[4] + device_name = parts[5].split('/')[-1] # Get device name after 'mfa/' + return f"{account_id}-{device_name}" + else: + # Assume it's just a serial number + return f"session-{mfa_serial}" + + def _get_target_profile(self, parsed_args, mfa_serial=None): + """Get or generate the target profile name.""" + target_profile = parsed_args.update_profile + if not target_profile: + if mfa_serial: + target_profile = self._generate_profile_name_from_mfa(mfa_serial) + else: + target_profile = "session-temp" + if sys.stdin.isatty(): + target_profile = self._prompter.get_value( + target_profile, 'Profile to update' + ) + return target_profile + def _run_main(self, parsed_args, parsed_globals): # Get the source profile for credentials source_profile = parsed_globals.profile or 'default' - # Get the target profile to update - target_profile = parsed_args.update_profile - # Get duration seconds duration_seconds = ( parsed_args.duration_seconds or 43200 ) # Default 12 hours - # Import here to avoid circular imports - # Create a new session with the specified profile try: # Use botocore's native profile handling @@ -223,16 +244,8 @@ def _run_main(self, parsed_args, parsed_globals): sys.stderr.write("MFA token code is required\n") return 1 - # If no target profile is specified, generate a default name - if not target_profile: - random_suffix = ''.join( - random.choices(string.ascii_lowercase + string.digits, k=5) - ) - target_profile = f"session-{random_suffix}" - if sys.stdin.isatty(): - target_profile = self._prompter.get_value( - target_profile, 'Profile to update' - ) + # Get the target profile name + target_profile = self._get_target_profile(parsed_args, mfa_serial) # Call STS to get temporary credentials try: @@ -327,16 +340,8 @@ def _handle_missing_default_profile(self, parsed_args, duration_seconds): sys.stderr.write("MFA token code is required\n") return 1 - # Generate target profile name if not specified - target_profile = parsed_args.update_profile - if not target_profile: - random_suffix = ''.join( - random.choices(string.ascii_lowercase + string.digits, k=5) - ) - target_profile = f"session-{random_suffix}" - target_profile = self._prompter.get_value( - target_profile, 'Profile to update' - ) + # Get the target profile name + target_profile = self._get_target_profile(parsed_args, mfa_serial) # Create a temporary session with the provided credentials session = botocore.session.Session() From 6b62c61a4beba0094abeae3572800ebf1a7f5351 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:13:52 -0700 Subject: [PATCH 06/17] removed isatty() checks --- awscli/customizations/configure/mfalogin.py | 39 ++++++++------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index f58f5d2f14cf..0b316e1eefa5 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -154,10 +154,9 @@ def _get_target_profile(self, parsed_args, mfa_serial=None): target_profile = self._generate_profile_name_from_mfa(mfa_serial) else: target_profile = "session-temp" - if sys.stdin.isatty(): - target_profile = self._prompter.get_value( - target_profile, 'Profile to update' - ) + target_profile = self._prompter.get_value( + target_profile, 'Profile to update' + ) return target_profile def _run_main(self, parsed_args, parsed_globals): @@ -187,8 +186,8 @@ def _run_main(self, parsed_args, parsed_globals): # Get credentials credentials = session.get_credentials() if credentials is None: - # If no credentials found and using default profile and running interactively, prompt for them - if source_profile == 'default' and sys.stdin.isatty(): + # If no credentials found and using default profile, prompt for them + if source_profile == 'default': return self._handle_missing_default_profile( parsed_args, duration_seconds ) @@ -200,8 +199,8 @@ def _run_main(self, parsed_args, parsed_globals): source_config = session.get_scoped_config() except ProfileNotFound: - # If default profile not found and running interactively, prompt for credentials - if source_profile == 'default' and sys.stdin.isatty(): + # If default profile not found, prompt for credentials + if source_profile == 'default': return self._handle_missing_default_profile( parsed_args, duration_seconds ) @@ -221,26 +220,18 @@ def _run_main(self, parsed_args, parsed_globals): 'mfa_serial' ) if not mfa_serial: - if sys.stdin.isatty(): - mfa_serial = self._prompter.get_credential_value( - 'None', 'mfa_serial', 'MFA serial number or ARN' - ) - if not mfa_serial: - sys.stderr.write("MFA serial number or ARN is required\n") - return 1 - else: + mfa_serial = self._prompter.get_credential_value( + 'None', 'mfa_serial', 'MFA serial number or ARN' + ) + if not mfa_serial: sys.stderr.write("MFA serial number or ARN is required\n") return 1 # Get MFA token code - if sys.stdin.isatty(): - token_code = self._prompter.get_credential_value( - 'None', 'mfa_token', 'MFA token code' - ) - if not token_code: - sys.stderr.write("MFA token code is required\n") - return 1 - else: + token_code = self._prompter.get_credential_value( + 'None', 'mfa_token', 'MFA token code' + ) + if not token_code: sys.stderr.write("MFA token code is required\n") return 1 From 26f2cff2cb5ad25481156e3de235e042c368a379 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:29:14 -0700 Subject: [PATCH 07/17] removed duplicative code. --- awscli/customizations/configure/mfalogin.py | 191 ++++++++------------ 1 file changed, 80 insertions(+), 111 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index 0b316e1eefa5..98cdfa10788f 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -159,6 +159,62 @@ def _get_target_profile(self, parsed_args, mfa_serial=None): ) return target_profile + def _get_mfa_token(self): + """Prompt for MFA token code.""" + token_code = self._prompter.get_credential_value( + 'None', 'mfa_token', 'MFA token code' + ) + if not token_code: + sys.stderr.write("MFA token code is required\n") + return None + return token_code + + def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, token_code): + """Call STS to get temporary credentials.""" + try: + response = sts_client.get_session_token( + DurationSeconds=duration_seconds, + SerialNumber=mfa_serial, + TokenCode=token_code, + ) + return response + except ClientError as e: + sys.stderr.write(f"An error occurred: {e}\n") + return None + + def _write_temporary_credentials(self, temp_credentials, target_profile): + """Write temporary credentials to the credentials file.""" + credentials_file = os.path.expanduser('~/.aws/credentials') + + credential_values = { + '__section__': target_profile, + 'aws_access_key_id': temp_credentials['AccessKeyId'], + 'aws_secret_access_key': temp_credentials['SecretAccessKey'], + 'aws_session_token': temp_credentials['SessionToken'], + } + + try: + expiration_time = temp_credentials['Expiration'].strftime( + '%Y-%m-%d %H:%M:%S UTC' + ) + except AttributeError: + expiration_time = str(temp_credentials['Expiration']) + + credential_values['#expiration'] = ( + f"# Credentials expire at: {expiration_time}" + ) + + self._config_writer.update_config(credential_values, credentials_file) + + sys.stdout.write( + f"Temporary credentials written to profile '{target_profile}'\n" + ) + sys.stdout.write(f"Credentials will expire at {expiration_time}\n") + sys.stdout.write( + f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" + ) + return 0 + def _run_main(self, parsed_args, parsed_globals): # Get the source profile for credentials source_profile = parsed_globals.profile or 'default' @@ -228,68 +284,25 @@ def _run_main(self, parsed_args, parsed_globals): return 1 # Get MFA token code - token_code = self._prompter.get_credential_value( - 'None', 'mfa_token', 'MFA token code' - ) + token_code = self._get_mfa_token() if not token_code: - sys.stderr.write("MFA token code is required\n") return 1 # Get the target profile name target_profile = self._get_target_profile(parsed_args, mfa_serial) # Call STS to get temporary credentials - try: - sts_client = session.create_client('sts') - response = sts_client.get_session_token( - DurationSeconds=duration_seconds, - SerialNumber=mfa_serial, - TokenCode=token_code, - ) - except ClientError as e: - sys.stderr.write(f"An error occurred: {e}\n") - return 1 - - # Extract credentials from response - temp_credentials = response['Credentials'] - - # Write credentials to the credentials file - credentials_file = os.path.expanduser('~/.aws/credentials') - - # Prepare the values to write - credential_values = { - '__section__': target_profile, - 'aws_access_key_id': temp_credentials['AccessKeyId'], - 'aws_secret_access_key': temp_credentials['SecretAccessKey'], - 'aws_session_token': temp_credentials['SessionToken'], - } - - # Get expiration time as a string - try: - expiration_time = temp_credentials['Expiration'].strftime( - '%Y-%m-%d %H:%M:%S UTC' - ) - except Exception: - # Handle case where Expiration might not be a datetime object - expiration_time = str(temp_credentials['Expiration']) - - # Add expiration as a comment in the config file - credential_values['#expiration'] = ( - f"# Credentials expire at: {expiration_time}" + sts_client = session.create_client('sts') + response = self._call_sts_get_session_token( + sts_client, duration_seconds, mfa_serial, token_code ) + if not response: + return 1 - # Write to credentials file - self._config_writer.update_config(credential_values, credentials_file) - - sys.stdout.write( - f"Temporary credentials written to profile '{target_profile}'\n" + # Write credentials and return + return self._write_temporary_credentials( + response['Credentials'], target_profile ) - sys.stdout.write(f"Credentials will expire at {expiration_time}\n") - sys.stdout.write( - f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" - ) - - return 0 def _handle_missing_default_profile(self, parsed_args, duration_seconds): """Handle the case where no default profile exists by prompting for credentials.""" @@ -324,73 +337,29 @@ def _handle_missing_default_profile(self, parsed_args, duration_seconds): return 1 # Get MFA token code - token_code = self._prompter.get_credential_value( - 'None', 'mfa_token', 'MFA token code' - ) + token_code = self._get_mfa_token() if not token_code: - sys.stderr.write("MFA token code is required\n") return 1 # Get the target profile name target_profile = self._get_target_profile(parsed_args, mfa_serial) - # Create a temporary session with the provided credentials + # Create STS client with the provided credentials session = botocore.session.Session() - - try: - # Create STS client with the provided credentials - sts_client = session.create_client( - 'sts', - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - ) - - # Call STS to get temporary credentials - response = sts_client.get_session_token( - DurationSeconds=duration_seconds, - SerialNumber=mfa_serial, - TokenCode=token_code, - ) - except ClientError as e: - sys.stderr.write(f"An error occurred: {e}\n") - return 1 - - # Extract credentials from response - temp_credentials = response['Credentials'] - - # Write credentials to the credentials file - credentials_file = os.path.expanduser('~/.aws/credentials') - - # Prepare the values to write - credential_values = { - '__section__': target_profile, - 'aws_access_key_id': temp_credentials['AccessKeyId'], - 'aws_secret_access_key': temp_credentials['SecretAccessKey'], - 'aws_session_token': temp_credentials['SessionToken'], - } - - # Get expiration time as a string - try: - expiration_time = temp_credentials['Expiration'].strftime( - '%Y-%m-%d %H:%M:%S UTC' - ) - except Exception: - expiration_time = str(temp_credentials['Expiration']) - - # Add expiration as a comment in the config file - credential_values['#expiration'] = ( - f"# Credentials expire at: {expiration_time}" + sts_client = session.create_client( + 'sts', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, ) - # Write to credentials file - self._config_writer.update_config(credential_values, credentials_file) - - sys.stdout.write( - f"Temporary credentials written to profile '{target_profile}'\n" - ) - sys.stdout.write(f"Credentials will expire at {expiration_time}\n") - sys.stdout.write( - f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" + # Call STS to get temporary credentials + response = self._call_sts_get_session_token( + sts_client, duration_seconds, mfa_serial, token_code ) + if not response: + return 1 - return 0 + # Write credentials and return + return self._write_temporary_credentials( + response['Credentials'], target_profile + ) From 6ecd5f85ed43ca22bf572c6da1a857ee6fa32f9e Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:39:32 -0700 Subject: [PATCH 08/17] moved out readme-like files, defaulted variable for session duration --- awscli/customizations/configure/mfalogin.py | 33 ++++----------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index 98cdfa10788f..b9fbd8bba2e0 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -51,35 +51,15 @@ class ConfigureMFALoginCommand(BasicCommand): """Configures MFA login for AWS CLI by creating temporary credentials.""" NAME = 'mfa-login' - DESCRIPTION = ( - 'Sets up temporary credentials for MFA authentication. ' - 'This command creates a new profile with temporary credentials ' - 'obtained using the AWS STS get-session-token API.' + DESCRIPTION = BasicCommand.FROM_FILE( + 'configure', 'mfa-login', '_description.rst' ) SYNOPSIS = ( 'aws configure mfa-login [--profile profile-name] ' '[--update-profile profile-to-update] [--duration-seconds seconds] ' '[--serial-number mfa-serial-number]' ) - EXAMPLES = ( - 'To create a new profile with temporary credentials::\n' - '\n' - ' $ aws configure mfa-login\n' - ' MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user\n' - ' MFA token code: 123456\n' - ' Profile to update [session-12345]:\n' - '\n' - 'To update an existing profile with temporary credentials::\n' - '\n' - ' $ aws configure mfa-login --profile myprofile --update-profile mytemp\n' - ' MFA token code: 123456\n' - '\n' - 'To specify the MFA device serial number or ARN directly::\n' - '\n' - ' $ aws configure mfa-login --serial-number arn:aws:iam::123456789012:mfa/user\n' - ' MFA token code: 123456\n' - ' Profile to update [session-12345]:\n' - ) + EXAMPLES = BasicCommand.FROM_FILE('configure', 'mfa-login', '_examples.rst') ARG_TABLE = [ { 'name': 'profile', @@ -102,11 +82,12 @@ class ConfigureMFALoginCommand(BasicCommand): 'name': 'duration-seconds', 'help_text': ( 'The duration, in seconds, that the credentials should remain valid. ' - 'Default is 43200 seconds (12 hours).' + 'Minimum is 900 seconds (15 minutes), maximum is 129600 seconds (36 hours).' ), 'action': 'store', 'required': False, 'cli_type_name': 'integer', + 'default': 43200, }, { 'name': 'serial-number', @@ -220,9 +201,7 @@ def _run_main(self, parsed_args, parsed_globals): source_profile = parsed_globals.profile or 'default' # Get duration seconds - duration_seconds = ( - parsed_args.duration_seconds or 43200 - ) # Default 12 hours + duration_seconds = parsed_args.duration_seconds # Create a new session with the specified profile try: From aa803db9f28fe7123d60aa5b7275ccec451efedd Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:42:31 -0700 Subject: [PATCH 09/17] added support for env var credential file --- awscli/customizations/configure/mfalogin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index b9fbd8bba2e0..cc87791a7819 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -165,7 +165,7 @@ def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, def _write_temporary_credentials(self, temp_credentials, target_profile): """Write temporary credentials to the credentials file.""" - credentials_file = os.path.expanduser('~/.aws/credentials') + credentials_file = os.path.expanduser(self._session.get_config_variable('credentials_file')) credential_values = { '__section__': target_profile, From c02b7c5941dd4197973b754130baef4a8ca3a4dc Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:50:13 -0700 Subject: [PATCH 10/17] clearer prompt on MFA device arn may be required --- awscli/customizations/configure/mfalogin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index cc87791a7819..04f1b337234d 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -259,7 +259,7 @@ def _run_main(self, parsed_args, parsed_globals): 'None', 'mfa_serial', 'MFA serial number or ARN' ) if not mfa_serial: - sys.stderr.write("MFA serial number or ARN is required\n") + sys.stderr.write("MFA serial number or MFA device ARN is required\n") return 1 # Get MFA token code From c5b2ab6e8ab25a849b4249c174c5646ef5fce0e2 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 12:58:40 -0700 Subject: [PATCH 11/17] made simpler to read and maintain, changed credential file expiration entry --- awscli/customizations/configure/mfalogin.py | 110 +++++++++----------- 1 file changed, 51 insertions(+), 59 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index 04f1b337234d..dd3230d991a6 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -163,6 +163,48 @@ def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, sys.stderr.write(f"An error occurred: {e}\n") return None + def _setup_session_with_profile(self, source_profile): + """Setup and validate session with the given profile.""" + try: + session = botocore.session.Session(profile=source_profile) + + if (source_profile not in session.available_profiles + and source_profile != 'default'): + sys.stderr.write(f"The profile ({source_profile}) could not be found. \n") + return None, None + + credentials = session.get_credentials() + if credentials is None: + if source_profile == 'default': + return None, None # Signal to handle missing default profile + else: + sys.stderr.write(f"Unable to locate credentials for profile {source_profile}\n") + return None, None + + return session, session.get_scoped_config() + + except ProfileNotFound: + if source_profile == 'default': + return None, None # Signal to handle missing default profile + else: + sys.stderr.write(f"The profile ({source_profile}) could not be found. \n") + return None, None + except Exception as e: + sys.stderr.write(f"Error accessing profile {source_profile}: {str(e)}\n") + return None, None + + def _resolve_mfa_serial(self, parsed_args, source_config): + """Resolve MFA serial from args, config, or prompt.""" + mfa_serial = parsed_args.serial_number or source_config.get('mfa_serial') + if not mfa_serial: + mfa_serial = self._prompter.get_credential_value( + 'None', 'mfa_serial', 'MFA serial number or ARN' + ) + if not mfa_serial: + sys.stderr.write("MFA serial number or MFA device ARN is required\n") + return None + return mfa_serial + def _write_temporary_credentials(self, temp_credentials, target_profile): """Write temporary credentials to the credentials file.""" credentials_file = os.path.expanduser(self._session.get_config_variable('credentials_file')) @@ -181,8 +223,8 @@ def _write_temporary_credentials(self, temp_credentials, target_profile): except AttributeError: expiration_time = str(temp_credentials['Expiration']) - credential_values['#expiration'] = ( - f"# Credentials expire at: {expiration_time}" + credential_values['#Credentials expire at: '] = ( + f"{expiration_time}" ) self._config_writer.update_config(credential_values, credentials_file) @@ -197,70 +239,20 @@ def _write_temporary_credentials(self, temp_credentials, target_profile): return 0 def _run_main(self, parsed_args, parsed_globals): - # Get the source profile for credentials source_profile = parsed_globals.profile or 'default' - - # Get duration seconds duration_seconds = parsed_args.duration_seconds - # Create a new session with the specified profile - try: - # Use botocore's native profile handling - session = botocore.session.Session(profile=source_profile) - - # Check if profile exists - if ( - source_profile not in session.available_profiles - and source_profile != 'default' - ): - sys.stderr.write( - f"The profile ({source_profile}) could not be found. \n" - ) - return 1 - - # Get credentials - credentials = session.get_credentials() - if credentials is None: - # If no credentials found and using default profile, prompt for them - if source_profile == 'default': - return self._handle_missing_default_profile( - parsed_args, duration_seconds - ) - else: - sys.stderr.write( - f"Unable to locate credentials for profile {source_profile}\n" - ) - return 1 - - source_config = session.get_scoped_config() - except ProfileNotFound: - # If default profile not found, prompt for credentials + # Setup session and validate profile + session, source_config = self._setup_session_with_profile(source_profile) + if session is None: if source_profile == 'default': - return self._handle_missing_default_profile( - parsed_args, duration_seconds - ) - else: - sys.stderr.write( - f"The profile ({source_profile}) could not be found. \n" - ) - return 1 - except Exception as e: - sys.stderr.write( - f"Error accessing profile {source_profile}: {str(e)}\n" - ) + return self._handle_missing_default_profile(parsed_args, duration_seconds) return 1 - # Get MFA serial number - mfa_serial = parsed_args.serial_number or source_config.get( - 'mfa_serial' - ) + # Resolve MFA serial number + mfa_serial = self._resolve_mfa_serial(parsed_args, source_config) if not mfa_serial: - mfa_serial = self._prompter.get_credential_value( - 'None', 'mfa_serial', 'MFA serial number or ARN' - ) - if not mfa_serial: - sys.stderr.write("MFA serial number or MFA device ARN is required\n") - return 1 + return 1 # Get MFA token code token_code = self._get_mfa_token() From aa5e0084dc51586382fab9701598a0b3332443be Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 13:09:11 -0700 Subject: [PATCH 12/17] friendlier prompting, cleaner logic --- awscli/customizations/configure/mfalogin.py | 63 ++++++++----------- .../customizations/configure/test_mfalogin.py | 10 +-- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index dd3230d991a6..e799df3fb387 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -101,6 +101,14 @@ class ConfigureMFALoginCommand(BasicCommand): }, ] + # Values to prompt for during interactive setup + VALUES_TO_PROMPT = [ + ('aws_access_key_id', 'AWS Access Key ID'), + ('aws_secret_access_key', 'AWS Secret Access Key'), + ('mfa_serial', 'MFA serial number or ARN'), + ('mfa_token', 'MFA token code'), + ] + def __init__(self, session, prompter=None, config_writer=None): super().__init__(session) if prompter is None: @@ -246,7 +254,7 @@ def _run_main(self, parsed_args, parsed_globals): session, source_config = self._setup_session_with_profile(source_profile) if session is None: if source_profile == 'default': - return self._handle_missing_default_profile(parsed_args, duration_seconds) + return self._handle_interactive_prompting(parsed_args, duration_seconds) return 1 # Resolve MFA serial number @@ -275,57 +283,40 @@ def _run_main(self, parsed_args, parsed_globals): response['Credentials'], target_profile ) - def _handle_missing_default_profile(self, parsed_args, duration_seconds): - """Handle the case where no default profile exists by prompting for credentials.""" + def _handle_interactive_prompting(self, parsed_args, duration_seconds): + """Handle the case where no default profile exists, and there is no profile explicitly named as a configuration source""" sys.stdout.write( - "No default profile found. Please provide your AWS credentials:\n" - ) - - # Prompt for access key ID - access_key = self._prompter.get_credential_value( - 'None', 'aws_access_key_id', 'AWS Access Key ID' - ) - if not access_key or access_key == 'None': - sys.stderr.write("AWS Access Key ID is required\n") - return 1 - - # Prompt for secret access key - secret_key = self._prompter.get_credential_value( - 'None', 'aws_secret_access_key', 'AWS Secret Access Key' + "Please provide your AWS credentials:\n" ) - if not secret_key or secret_key == 'None': - sys.stderr.write("AWS Secret Access Key is required\n") - return 1 - # Get MFA serial number - mfa_serial = parsed_args.serial_number - if not mfa_serial: - mfa_serial = self._prompter.get_credential_value( - 'None', 'mfa_serial', 'MFA serial number or ARN' + values = {} + for config_name, prompt_text in self.VALUES_TO_PROMPT: + if config_name == 'mfa_serial' and parsed_args.serial_number: + values[config_name] = parsed_args.serial_number + continue + + value = self._prompter.get_credential_value( + 'None', config_name, prompt_text ) - if not mfa_serial: - sys.stderr.write("MFA serial number or ARN is required\n") + if not value or value == 'None': + sys.stderr.write(f"{prompt_text} is required\n") return 1 - - # Get MFA token code - token_code = self._get_mfa_token() - if not token_code: - return 1 + values[config_name] = value # Get the target profile name - target_profile = self._get_target_profile(parsed_args, mfa_serial) + target_profile = self._get_target_profile(parsed_args, values['mfa_serial']) # Create STS client with the provided credentials session = botocore.session.Session() sts_client = session.create_client( 'sts', - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, + aws_access_key_id=values['aws_access_key_id'], + aws_secret_access_key=values['aws_secret_access_key'], ) # Call STS to get temporary credentials response = self._call_sts_get_session_token( - sts_client, duration_seconds, mfa_serial, token_code + sts_client, duration_seconds, values['mfa_serial'], values['mfa_token'] ) if not response: return 1 diff --git a/tests/unit/customizations/configure/test_mfalogin.py b/tests/unit/customizations/configure/test_mfalogin.py index 972f90b6e28b..91e114024bf7 100644 --- a/tests/unit/customizations/configure/test_mfalogin.py +++ b/tests/unit/customizations/configure/test_mfalogin.py @@ -376,7 +376,7 @@ def test_handle_missing_default_profile_success(self): 'os.path.expanduser', return_value='/tmp/credentials' ): with mock.patch('sys.stdout'): - rc = self.command._handle_missing_default_profile( + rc = self.command._handle_interactive_prompting( self.parsed_args, 43200 ) @@ -400,7 +400,7 @@ def test_handle_missing_default_profile_missing_access_key(self): ) with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._handle_missing_default_profile( + rc = self.command._handle_interactive_prompting( self.parsed_args, 43200 ) @@ -417,7 +417,7 @@ def test_handle_missing_default_profile_missing_secret_key(self): ] with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._handle_missing_default_profile( + rc = self.command._handle_interactive_prompting( self.parsed_args, 43200 ) @@ -469,7 +469,7 @@ def test_handle_missing_default_profile_sts_error(self): with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._handle_missing_default_profile( + rc = self.command._handle_interactive_prompting( self.parsed_args, 43200 ) @@ -529,7 +529,7 @@ def test_empty_credential_input_handling(self): self.prompter.get_credential_value.return_value = '' # Empty string with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._handle_missing_default_profile( + rc = self.command._handle_interactive_prompting( self.parsed_args, 43200 ) From 7bdfb861c7172307db3131ddb0b57dfd42a34c47 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Tue, 19 Aug 2025 13:28:22 -0700 Subject: [PATCH 13/17] updated tests for all the improvements --- .../customizations/configure/test_mfalogin.py | 127 +++++++++++------- 1 file changed, 82 insertions(+), 45 deletions(-) diff --git a/tests/unit/customizations/configure/test_mfalogin.py b/tests/unit/customizations/configure/test_mfalogin.py index 91e114024bf7..9b4385c4187a 100644 --- a/tests/unit/customizations/configure/test_mfalogin.py +++ b/tests/unit/customizations/configure/test_mfalogin.py @@ -72,23 +72,42 @@ def setUp(self): self.parsed_globals.profile = 'default' def test_no_credentials_found(self): + # Setup mock responses for interactive prompting + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + # Mock botocore.session.Session mock_session = mock.Mock() mock_session.get_credentials.return_value = None mock_session.available_profiles = ['default'] + + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + mock_session.create_client.return_value = sts_client with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch( 'sys.stdin.isatty', return_value=False ): # Non-interactive - with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) - self.assertEqual(rc, 1) - mock_stderr.write.assert_called_with( - "Unable to locate credentials for profile default\n" - ) + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 0) # Should succeed via interactive prompting def test_profile_not_found(self): # Set profile to a non-existent profile @@ -117,14 +136,14 @@ def test_no_mfa_serial_provided(self): with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch('sys.stdin.isatty', return_value=True): - self.prompter.get_value.return_value = 'None' + self.prompter.get_credential_value.return_value = None with mock.patch('sys.stderr') as mock_stderr: rc = self.command._run_main( self.parsed_args, self.parsed_globals ) self.assertEqual(rc, 1) mock_stderr.write.assert_called_with( - "MFA serial number or ARN is required\n" + "MFA serial number or MFA device ARN is required\n" ) def test_no_token_code_provided(self): @@ -136,9 +155,9 @@ def test_no_token_code_provided(self): with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch('sys.stdin.isatty', return_value=True): - self.prompter.get_value.side_effect = [ + self.prompter.get_credential_value.side_effect = [ 'arn:aws:iam::123456789012:mfa/user', - 'None', + None, ] with mock.patch('sys.stderr') as mock_stderr: rc = self.command._run_main( @@ -169,11 +188,11 @@ def test_sts_client_error(self): mock_session.create_client.return_value = sts_client with mock.patch('botocore.session.Session', return_value=mock_session): - self.prompter.get_value.side_effect = [ + self.prompter.get_credential_value.side_effect = [ 'arn:aws:iam::123456789012:mfa/user', '123456', - 'session-test', ] + self.prompter.get_value.return_value = 'session-test' with mock.patch('sys.stderr') as mock_stderr: rc = self.command._run_main( @@ -186,11 +205,12 @@ def test_sts_client_error(self): def test_successful_mfa_login(self): # Setup - self.prompter.get_value.side_effect = [ + self.parsed_args.duration_seconds = 43200 + self.prompter.get_credential_value.side_effect = [ 'arn:aws:iam::123456789012:mfa/user', '123456', - 'session-test', ] + self.prompter.get_value.return_value = 'session-test' expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) sts_response = { @@ -238,7 +258,7 @@ def test_successful_mfa_login(self): 'aws_access_key_id': 'ASIAIOSFODNN7EXAMPLE', 'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', 'aws_session_token': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', - '#expiration': f"# Credentials expire at: {expiration.strftime('%Y-%m-%d %H:%M:%S UTC')}", + '#Credentials expire at: ': f"{expiration.strftime('%Y-%m-%d %H:%M:%S UTC')}", } self.config_writer.update_config.assert_called_with( @@ -250,6 +270,7 @@ def test_serial_number_from_parameter(self): self.parsed_args.serial_number = ( 'arn:aws:iam::123456789012:mfa/user-param' ) + self.parsed_args.duration_seconds = 43200 # Mock botocore.session.Session mock_session = mock.Mock() @@ -274,10 +295,8 @@ def test_serial_number_from_parameter(self): with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch('sys.stdin.isatty', return_value=True): - self.prompter.get_value.side_effect = [ - '123456', - 'session-test', - ] + self.prompter.get_credential_value.return_value = '123456' + self.prompter.get_value.return_value = 'session-test' with mock.patch( 'os.path.expanduser', return_value='/tmp/credentials' ): @@ -306,10 +325,10 @@ def test_missing_default_profile_interactive(self): # Mock sys.stdin.isatty to return True (interactive) with mock.patch('sys.stdin.isatty', return_value=True): - # Mock the _handle_missing_default_profile method + # Mock the _handle_interactive_prompting method with mock.patch.object( self.command, - '_handle_missing_default_profile', + '_handle_interactive_prompting', return_value=0, ) as mock_handle: rc = self.command._run_main( @@ -318,41 +337,59 @@ def test_missing_default_profile_interactive(self): self.assertEqual(rc, 0) mock_handle.assert_called_once_with( - self.parsed_args, 43200 + self.parsed_args, None ) def test_missing_default_profile_non_interactive(self): """Test error when no default profile exists in non-interactive mode.""" self.parsed_globals.profile = None # Use default profile + + # Setup mock responses for interactive prompting + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name - # Mock botocore.session.Session to raise ProfileNotFound - with mock.patch('botocore.session.Session') as mock_session_class: - mock_session_class.side_effect = ProfileNotFound(profile='default') - - # Mock sys.stdin.isatty to return False (non-interactive) + # Mock botocore.session.Session to return None credentials + mock_session = mock.Mock() + mock_session.get_credentials.return_value = None + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): with mock.patch('sys.stdin.isatty', return_value=False): - with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + mock_session.create_client.return_value = sts_client + + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) - self.assertEqual(rc, 1) - mock_stderr.write.assert_called_with( - "The profile (default) could not be found. \n" - ) + self.assertEqual(rc, 0) # Should succeed via interactive prompting def test_handle_missing_default_profile_success(self): """Test successful credential prompting and MFA login when no default profile exists.""" - # Setup mock responses for prompting + # Setup mock responses for prompting - now all via get_credential_value self.prompter.get_credential_value.side_effect = [ 'AKIAIOSFODNN7EXAMPLE', # access key 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key - ] - self.prompter.get_value.side_effect = [ 'arn:aws:iam::123456789012:mfa/user', # MFA serial '123456', # MFA token - 'session-test', # profile name ] + self.prompter.get_value.return_value = 'session-test' # profile name # Mock STS response expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) @@ -446,12 +483,10 @@ def test_handle_missing_default_profile_sts_error(self): self.prompter.get_credential_value.side_effect = [ 'AKIAIOSFODNN7EXAMPLE', # access key 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key - ] - self.prompter.get_value.side_effect = [ 'arn:aws:iam::123456789012:mfa/user', # MFA serial '123456', # MFA token - 'session-test', # profile name ] + self.prompter.get_value.return_value = 'session-test' # profile name # Mock STS client to raise an error mock_session = mock.Mock() @@ -491,6 +526,7 @@ def test_non_interactive_missing_mfa_serial(self): with mock.patch( 'sys.stdin.isatty', return_value=False ): # Non-interactive + self.prompter.get_credential_value.return_value = None with mock.patch('sys.stderr') as mock_stderr: rc = self.command._run_main( self.parsed_args, self.parsed_globals @@ -498,7 +534,7 @@ def test_non_interactive_missing_mfa_serial(self): self.assertEqual(rc, 1) mock_stderr.write.assert_called_with( - "MFA serial number or ARN is required\n" + "MFA serial number or MFA device ARN is required\n" ) def test_non_interactive_missing_token_code(self): @@ -514,6 +550,7 @@ def test_non_interactive_missing_token_code(self): with mock.patch( 'sys.stdin.isatty', return_value=False ): # Non-interactive + self.prompter.get_credential_value.return_value = None with mock.patch('sys.stderr') as mock_stderr: rc = self.command._run_main( self.parsed_args, self.parsed_globals From afc8fc2cf675cd891cb2a2614dfe77b1c4ef9a7c Mon Sep 17 00:00:00 2001 From: liam wadman Date: Wed, 3 Sep 2025 13:11:58 -0700 Subject: [PATCH 14/17] removed unneeded arg table entry, removed uncalled functions --- awscli/customizations/configure/mfalogin.py | 23 ++++++--------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index e799df3fb387..b47dedc6d323 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -61,13 +61,6 @@ class ConfigureMFALoginCommand(BasicCommand): ) EXAMPLES = BasicCommand.FROM_FILE('configure', 'mfa-login', '_examples.rst') ARG_TABLE = [ - { - 'name': 'profile', - 'help_text': ('Use a specific profile from your credential file.'), - 'action': 'store', - 'required': False, - 'cli_type_name': 'string', - }, { 'name': 'update-profile', 'help_text': ( @@ -118,11 +111,6 @@ def __init__(self, session, prompter=None, config_writer=None): config_writer = ConfigFileWriter() self._config_writer = config_writer - def _check_profile_exists(self, profile_name): - """Check if a profile exists using botocore's native profile handling.""" - session = botocore.session.Session() - return profile_name in session.available_profiles - def _generate_profile_name_from_mfa(self, mfa_serial): """Generate a deterministic profile name from MFA serial/ARN.""" if mfa_serial.startswith('arn:aws:iam::'): @@ -174,7 +162,12 @@ def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, def _setup_session_with_profile(self, source_profile): """Setup and validate session with the given profile.""" try: - session = botocore.session.Session(profile=source_profile) + # Use existing session if source_profile matches the CLI's profile + cli_profile = self._session.get_config_variable('profile') or 'default' + if source_profile == cli_profile: + session = self._session + else: + session = botocore.session.Session(profile=source_profile) if (source_profile not in session.available_profiles and source_profile != 'default'): @@ -231,10 +224,6 @@ def _write_temporary_credentials(self, temp_credentials, target_profile): except AttributeError: expiration_time = str(temp_credentials['Expiration']) - credential_values['#Credentials expire at: '] = ( - f"{expiration_time}" - ) - self._config_writer.update_config(credential_values, credentials_file) sys.stdout.write( From 9ed78eb8a49bf9c64002a844aab179cce17f8fc7 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Wed, 3 Sep 2025 13:12:29 -0700 Subject: [PATCH 15/17] use session from global, do not create new --- awscli/customizations/configure/mfalogin.py | 50 ++++----------------- 1 file changed, 8 insertions(+), 42 deletions(-) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index b47dedc6d323..39539cdf173d 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -159,40 +159,7 @@ def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, sys.stderr.write(f"An error occurred: {e}\n") return None - def _setup_session_with_profile(self, source_profile): - """Setup and validate session with the given profile.""" - try: - # Use existing session if source_profile matches the CLI's profile - cli_profile = self._session.get_config_variable('profile') or 'default' - if source_profile == cli_profile: - session = self._session - else: - session = botocore.session.Session(profile=source_profile) - - if (source_profile not in session.available_profiles - and source_profile != 'default'): - sys.stderr.write(f"The profile ({source_profile}) could not be found. \n") - return None, None - - credentials = session.get_credentials() - if credentials is None: - if source_profile == 'default': - return None, None # Signal to handle missing default profile - else: - sys.stderr.write(f"Unable to locate credentials for profile {source_profile}\n") - return None, None - - return session, session.get_scoped_config() - - except ProfileNotFound: - if source_profile == 'default': - return None, None # Signal to handle missing default profile - else: - sys.stderr.write(f"The profile ({source_profile}) could not be found. \n") - return None, None - except Exception as e: - sys.stderr.write(f"Error accessing profile {source_profile}: {str(e)}\n") - return None, None + def _resolve_mfa_serial(self, parsed_args, source_config): """Resolve MFA serial from args, config, or prompt.""" @@ -236,15 +203,14 @@ def _write_temporary_credentials(self, temp_credentials, target_profile): return 0 def _run_main(self, parsed_args, parsed_globals): - source_profile = parsed_globals.profile or 'default' duration_seconds = parsed_args.duration_seconds - # Setup session and validate profile - session, source_config = self._setup_session_with_profile(source_profile) - if session is None: - if source_profile == 'default': - return self._handle_interactive_prompting(parsed_args, duration_seconds) - return 1 + # Use the CLI session directly + credentials = self._session.get_credentials() + if credentials is None: + return self._handle_interactive_prompting(parsed_args, duration_seconds) + + source_config = self._session.get_scoped_config() # Resolve MFA serial number mfa_serial = self._resolve_mfa_serial(parsed_args, source_config) @@ -260,7 +226,7 @@ def _run_main(self, parsed_args, parsed_globals): target_profile = self._get_target_profile(parsed_args, mfa_serial) # Call STS to get temporary credentials - sts_client = session.create_client('sts') + sts_client = self._session.create_client('sts') response = self._call_sts_get_session_token( sts_client, duration_seconds, mfa_serial, token_code ) From e8b70a75c4752e725c03e818f0be3abc01f0cfb0 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Wed, 3 Sep 2025 16:35:19 -0700 Subject: [PATCH 16/17] local tests passing --- awscli/customizations/configure/configure.py | 13 +- awscli/customizations/configure/mfalogin.py | 6 +- .../customizations/configure/test_mfalogin.py | 260 +++++++++--------- 3 files changed, 144 insertions(+), 135 deletions(-) diff --git a/awscli/customizations/configure/configure.py b/awscli/customizations/configure/configure.py index bcf1e6936a85..9160a09b5abe 100644 --- a/awscli/customizations/configure/configure.py +++ b/awscli/customizations/configure/configure.py @@ -121,6 +121,16 @@ def _needs_session_token(self, new_values): access_key = new_values.get('aws_access_key_id') return access_key and access_key.startswith('ASIA') + def _should_prompt_for_session_token(self, new_values, config): + """Determine if we should prompt for session token.""" + # Don't prompt if explicitly switching to long-term credentials + new_access_key = new_values.get('aws_access_key_id') + if new_access_key and not self._needs_session_token(new_values): + return False + + # Prompt if needed for temporary credentials or if already exists + return self._needs_session_token(new_values) or config.get('aws_session_token') + def _run_main(self, parsed_args, parsed_globals): # Called when invoked with no args "aws configure" @@ -133,8 +143,7 @@ def _run_main(self, parsed_args, parsed_globals): config = {} for config_name, prompt_text in self.VALUES_TO_PROMPT: - # Skip session token if not needed - if config_name == 'aws_session_token' and not self._needs_session_token(new_values): + if config_name == 'aws_session_token' and not self._should_prompt_for_session_token(new_values, config): continue current_value = config.get(config_name) diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py index 39539cdf173d..885e624c45a7 100644 --- a/awscli/customizations/configure/mfalogin.py +++ b/awscli/customizations/configure/mfalogin.py @@ -113,7 +113,7 @@ def __init__(self, session, prompter=None, config_writer=None): def _generate_profile_name_from_mfa(self, mfa_serial): """Generate a deterministic profile name from MFA serial/ARN.""" - if mfa_serial.startswith('arn:aws:iam::'): + if isinstance(mfa_serial, str) and mfa_serial.startswith('arn:aws:iam::'): # Parse ARN: arn:aws:iam::123456789012:mfa/device-name parts = mfa_serial.split(':') account_id = parts[4] @@ -159,8 +159,6 @@ def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, sys.stderr.write(f"An error occurred: {e}\n") return None - - def _resolve_mfa_serial(self, parsed_args, source_config): """Resolve MFA serial from args, config, or prompt.""" mfa_serial = parsed_args.serial_number or source_config.get('mfa_serial') @@ -279,4 +277,4 @@ def _handle_interactive_prompting(self, parsed_args, duration_seconds): # Write credentials and return return self._write_temporary_credentials( response['Credentials'], target_profile - ) + ) \ No newline at end of file diff --git a/tests/unit/customizations/configure/test_mfalogin.py b/tests/unit/customizations/configure/test_mfalogin.py index 9b4385c4187a..1c6ab7ba9b18 100644 --- a/tests/unit/customizations/configure/test_mfalogin.py +++ b/tests/unit/customizations/configure/test_mfalogin.py @@ -81,10 +81,8 @@ def test_no_credentials_found(self): ] self.prompter.get_value.return_value = 'session-test' # profile name - # Mock botocore.session.Session - mock_session = mock.Mock() - mock_session.get_credentials.return_value = None - mock_session.available_profiles = ['default'] + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None # Mock STS for the interactive prompting path sts_client = mock.Mock() @@ -96,9 +94,12 @@ def test_no_credentials_found(self): 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), } } - mock_session.create_client.return_value = sts_client - - with mock.patch('botocore.session.Session', return_value=mock_session): + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + with mock.patch( 'sys.stdin.isatty', return_value=False ): # Non-interactive @@ -112,20 +113,43 @@ def test_no_credentials_found(self): def test_profile_not_found(self): # Set profile to a non-existent profile self.parsed_globals.profile = 'nonexistent' - - # Mock botocore.session.Session - mock_session = mock.Mock() - mock_session.available_profiles = ['default', 'test'] - - with mock.patch('botocore.session.Session', return_value=mock_session): - with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) - self.assertEqual(rc, 1) - mock_stderr.write.assert_called_with( - "The profile (nonexistent) could not be found. \n" - ) + + # Mock the session to have no credentials for the nonexistent profile + self.session.get_credentials.return_value = None + self.session.get_scoped_config.return_value = {} + + # Setup mock responses for interactive prompting since no credentials found + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + # Should succeed via interactive prompting + self.assertEqual(rc, 0) def test_no_mfa_serial_provided(self): # Mock botocore.session.Session @@ -169,11 +193,7 @@ def test_no_token_code_provided(self): ) def test_sts_client_error(self): - # Mock botocore.session.Session - mock_session = mock.Mock() - mock_session.get_credentials.return_value = mock.Mock() - mock_session.get_scoped_config.return_value = {} - mock_session.available_profiles = ['default'] + self.session.get_scoped_config.return_value = {} sts_client = mock.Mock() sts_client.get_session_token.side_effect = ClientError( @@ -185,23 +205,22 @@ def test_sts_client_error(self): }, 'GetSessionToken', ) - mock_session.create_client.return_value = sts_client + self.session.create_client.return_value = sts_client - with mock.patch('botocore.session.Session', return_value=mock_session): - self.prompter.get_credential_value.side_effect = [ - 'arn:aws:iam::123456789012:mfa/user', - '123456', - ] - self.prompter.get_value.return_value = 'session-test' + self.prompter.get_credential_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + '123456', + ] + self.prompter.get_value.return_value = 'session-test' - with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) - self.assertEqual(rc, 1) - mock_stderr.write.assert_called_with( - mock.ANY - ) # Just check it was called + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + mock.ANY + ) # Just check it was called def test_successful_mfa_login(self): # Setup @@ -222,25 +241,18 @@ def test_successful_mfa_login(self): } } - # Mock botocore.session.Session - mock_session = mock.Mock() - mock_session.get_credentials.return_value = mock.Mock() - mock_session.get_scoped_config.return_value = {} - mock_session.available_profiles = ['default'] - sts_client = mock.Mock() sts_client.get_session_token.return_value = sts_response - mock_session.create_client.return_value = sts_client + self.session.create_client.return_value = sts_client - with mock.patch('botocore.session.Session', return_value=mock_session): - with mock.patch('sys.stdin.isatty', return_value=True): - with mock.patch( - 'os.path.expanduser', return_value='/tmp/credentials' - ): - with mock.patch('sys.stdout'): - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) + with mock.patch('sys.stdin.isatty', return_value=True): + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) # Verify self.assertEqual(rc, 0) @@ -258,7 +270,6 @@ def test_successful_mfa_login(self): 'aws_access_key_id': 'ASIAIOSFODNN7EXAMPLE', 'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', 'aws_session_token': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', - '#Credentials expire at: ': f"{expiration.strftime('%Y-%m-%d %H:%M:%S UTC')}", } self.config_writer.update_config.assert_called_with( @@ -272,13 +283,9 @@ def test_serial_number_from_parameter(self): ) self.parsed_args.duration_seconds = 43200 - # Mock botocore.session.Session - mock_session = mock.Mock() - mock_session.get_credentials.return_value = mock.Mock() - mock_session.get_scoped_config.return_value = { + self.session.get_scoped_config.return_value = { 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user-config' } - mock_session.available_profiles = ['default'] sts_client = mock.Mock() expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) @@ -291,19 +298,18 @@ def test_serial_number_from_parameter(self): } } sts_client.get_session_token.return_value = sts_response - mock_session.create_client.return_value = sts_client + self.session.create_client.return_value = sts_client - with mock.patch('botocore.session.Session', return_value=mock_session): - with mock.patch('sys.stdin.isatty', return_value=True): - self.prompter.get_credential_value.return_value = '123456' - self.prompter.get_value.return_value = 'session-test' - with mock.patch( - 'os.path.expanduser', return_value='/tmp/credentials' - ): - with mock.patch('sys.stdout'): - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_credential_value.return_value = '123456' + self.prompter.get_value.return_value = 'session-test' + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) # Verify self.assertEqual(rc, 0) @@ -318,27 +324,26 @@ def test_serial_number_from_parameter(self): def test_missing_default_profile_interactive(self): """Test prompting for credentials when no default profile exists in interactive mode.""" self.parsed_globals.profile = None # Use default profile + + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None + + # Mock sys.stdin.isatty to return True (interactive) + with mock.patch('sys.stdin.isatty', return_value=True): + # Mock the _handle_interactive_prompting method + with mock.patch.object( + self.command, + '_handle_interactive_prompting', + return_value=0, + ) as mock_handle: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) - # Mock botocore.session.Session to raise ProfileNotFound - with mock.patch('botocore.session.Session') as mock_session_class: - mock_session_class.side_effect = ProfileNotFound(profile='default') - - # Mock sys.stdin.isatty to return True (interactive) - with mock.patch('sys.stdin.isatty', return_value=True): - # Mock the _handle_interactive_prompting method - with mock.patch.object( - self.command, - '_handle_interactive_prompting', - return_value=0, - ) as mock_handle: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) - - self.assertEqual(rc, 0) - mock_handle.assert_called_once_with( - self.parsed_args, None - ) + self.assertEqual(rc, 0) + mock_handle.assert_called_once_with( + self.parsed_args, None + ) def test_missing_default_profile_non_interactive(self): """Test error when no default profile exists in non-interactive mode.""" @@ -353,25 +358,26 @@ def test_missing_default_profile_non_interactive(self): ] self.prompter.get_value.return_value = 'session-test' # profile name - # Mock botocore.session.Session to return None credentials - mock_session = mock.Mock() - mock_session.get_credentials.return_value = None - mock_session.available_profiles = ['default'] + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None - with mock.patch('botocore.session.Session', return_value=mock_session): + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + with mock.patch('sys.stdin.isatty', return_value=False): - # Mock STS for the interactive prompting path - sts_client = mock.Mock() - sts_client.get_session_token.return_value = { - 'Credentials': { - 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', - 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', - 'SessionToken': 'SESSION_TOKEN', - 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), - } - } - mock_session.create_client.return_value = sts_client - with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): with mock.patch('sys.stdout'): rc = self.command._run_main( @@ -539,27 +545,23 @@ def test_non_interactive_missing_mfa_serial(self): def test_non_interactive_missing_token_code(self): """Test non-interactive mode when token code would be prompted.""" - mock_session = mock.Mock() - mock_session.get_credentials.return_value = mock.Mock() - mock_session.get_scoped_config.return_value = { + self.session.get_scoped_config.return_value = { 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user' } - mock_session.available_profiles = ['default'] - with mock.patch('botocore.session.Session', return_value=mock_session): - with mock.patch( - 'sys.stdin.isatty', return_value=False - ): # Non-interactive - self.prompter.get_credential_value.return_value = None - with mock.patch('sys.stderr') as mock_stderr: - rc = self.command._run_main( - self.parsed_args, self.parsed_globals - ) + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + self.prompter.get_credential_value.return_value = None + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) - self.assertEqual(rc, 1) - mock_stderr.write.assert_called_with( - "MFA token code is required\n" - ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA token code is required\n" + ) def test_empty_credential_input_handling(self): """Test handling of empty credential inputs.""" From 34a0a580bb0719d0f4bfc39cb94148dffdb6e238 Mon Sep 17 00:00:00 2001 From: liam wadman Date: Wed, 3 Sep 2025 16:43:35 -0700 Subject: [PATCH 17/17] improved examples --- awscli/examples/configure/mfa-login.rst | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/awscli/examples/configure/mfa-login.rst b/awscli/examples/configure/mfa-login.rst index 5c21a165c7f2..b5290179248f 100644 --- a/awscli/examples/configure/mfa-login.rst +++ b/awscli/examples/configure/mfa-login.rst @@ -6,12 +6,12 @@ The following ``mfa-login`` command creates a new profile with temporary credent Output:: - MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/MFADeviceName MFA token code: 123456 - Profile to update [session-12345]: - Temporary credentials written to profile 'session-12345' + Profile to update [session-MFADeviceName]: + Temporary credentials written to profile 'session-MFADeviceName' Credentials will expire at 2023-05-19 18:06:10 UTC - To use these credentials, specify --profile session-12345 when running AWS CLI commands + To use these credentials, specify --profile session-MFADeviceName when running AWS CLI commands **To create credentials when no default profile exists** @@ -24,26 +24,26 @@ Output:: No default profile found. Please provide your AWS credentials: AWS Access Key ID: AKIAIOSFODNN7EXAMPLE AWS Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/MFADeviceName MFA token code: 123456 - Profile to update [session-12345]: - Temporary credentials written to profile 'session-12345' + Profile to update [session-MFADeviceName]: + Temporary credentials written to profile 'session-MFADeviceName' Credentials will expire at 2023-05-19 18:06:10 UTC - To use these credentials, specify --profile session-12345 when running AWS CLI commands + To use these credentials, specify --profile session-MFADeviceName when running AWS CLI commands **To update an existing profile with temporary MFA credentials** The following ``mfa-login`` command updates an existing profile with temporary credentials obtained using MFA authentication. :: - aws configure mfa-login --profile myprofile --update-profile mytemp + aws configure mfa-login --profile myprofile --update-profile mfaprofile Output:: MFA token code: 123456 - Temporary credentials written to profile 'mytemp' + Temporary credentials written to profile 'mfaprofile' Credentials will expire at 2023-05-19 18:06:10 UTC - To use these credentials, specify --profile mytemp when running AWS CLI commands + To use these credentials, specify --profile mfaprofile when running AWS CLI commands -**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. +**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported with this command. For more information, see `Using Multi-Factor Authentication (MFA) in AWS `__ in the *AWS IAM User Guide*. \ No newline at end of file