diff --git a/README.md b/README.md index 2d717119..9940767c 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Makes working with AWS Cognito easier for Python developers. - [Confirm Forgot Password](#confirm-forgot-password) - [Change Password](#change-password) - [Confirm Sign Up](#confirm-sign-up) + - [Resend Confirmation Code](#resend-confirmation-code) - [Update Profile](#update-profile) - [Send Verification](#send-verification) - [Get User Object](#get-user-object) @@ -264,6 +265,18 @@ u = Cognito('your-user-pool-id','your-client-id') u.confirm_sign_up('users-conf-code',username='bob') ``` +#### Resend Confirmation Code + +Resend the confirmation for registration + +```python +from warrant import Cognito + +u = Cognito('your-user-pool-id','your-client-id') + +u.resend_confirmation_code(username='bob') +``` + ##### Arguments - **confirmation_code:** Confirmation code sent via text or email @@ -444,21 +457,11 @@ This is the preferred method of user authentication with AWS Cognito. The process involves a series of authentication challenges and responses, which if successful, results in a final response that contains ID, access and refresh tokens. -### Using AWSSRP -The `AWSSRP` class takes a username, password, cognito user pool id, cognito app id, an optional -client secret (if app client is configured with client secret), an optional pool_region or `boto3` client. -Afterwards, the `authenticate_user` class method is used for SRP authentication. +### Using AWSSRP (Now WarrantLite) +The `AWSSRP` code has moved to [Warrant-Lite](https://github.com/capless/warrant-lite). Some projects don't need all of the features that Warrant has so we decided to make a separate. -```python -import boto3 -from warrant.aws_srp import AWSSRP -client = boto3.client('cognito-idp') -aws = AWSSRP(username='username', password='password', pool_id='user_pool_id', - client_id='client_id', client=client) -tokens = aws.authenticate_user() -``` ## Projects Using Warrant diff --git a/requirements.txt b/requirements.txt index 48b028b2..31ebaa49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ boto3>=1.4.3 envs>=0.3.0 -python-jose-cryptodome>=1.3.2 requests>=2.13.0 +pycryptodome>=3.9.0 +warrant-lite>=1.0.4 diff --git a/setup.py b/setup.py index f51d154c..eef6c014 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,16 @@ -import os - from setuptools import setup, find_packages -from pip.req import parse_requirements + +# pip >= 10 +try: + from pip._internal.req import parse_requirements +# pip <= 9.0.3 +except ImportError: + from pip.req import parse_requirements install_reqs = parse_requirements('requirements.txt', session=False) test_reqs = parse_requirements('requirements_test.txt', session=False) + version = '0.6.1' README="""Python class to integrate Boto3's Cognito client so it is easy to login users. With SRP support.""" diff --git a/warrant/__init__.py b/warrant/__init__.py index 2a0c2313..26edaf4f 100644 --- a/warrant/__init__.py +++ b/warrant/__init__.py @@ -3,11 +3,13 @@ import datetime import re import requests +from botocore.config import Config +from botocore import UNSIGNED from envs import env from jose import jwt, JWTError +from warrant_lite import WarrantLite -from .aws_srp import AWSSRP from .exceptions import TokenVerificationException @@ -137,9 +139,9 @@ def __init__( self, user_pool_id, client_id,user_pool_region=None, username=None, id_token=None, refresh_token=None, access_token=None, client_secret=None, - access_key=None, secret_key=None, + access_key=None, secret_key=None ): - """ + """ :param user_pool_id: Cognito User Pool ID :param client_id: Cognito User Pool Application client ID :param username: User Pool username @@ -152,7 +154,7 @@ def __init__( self.user_pool_id = user_pool_id self.client_id = client_id - self.user_pool_region = self.user_pool_id.split('_')[0] + self.user_pool_region = self.user_pool_id.split('_')[0] self.username = username self.id_token = id_token self.access_token = access_token @@ -163,13 +165,18 @@ def __init__( self.base_attributes = None boto3_client_kwargs = {} + boto3_client_kwargs['service_name'] = 'cognito-idp' if access_key and secret_key: boto3_client_kwargs['aws_access_key_id'] = access_key boto3_client_kwargs['aws_secret_access_key'] = secret_key if user_pool_region: boto3_client_kwargs['region_name'] = user_pool_region + + # if access_key and secret_key is not provided, make all calls UNSIGNED + if not access_key and not secret_key: + boto3_client_kwargs['config'] = Config(signature_version=UNSIGNED) - self.client = boto3.client('cognito-idp', **boto3_client_kwargs) + self.client = boto3.client(**boto3_client_kwargs) def get_keys(self): @@ -242,7 +249,7 @@ def switch_session(self,session): """ self.client = session.client('cognito-idp') - def check_token(self, renew=True): + def check_token(self, renew=True): """ Checks the exp attribute of the access_token and either refreshes the tokens by calling the renew_access_tokens method or does nothing @@ -268,14 +275,14 @@ def add_base_attributes(self, **kwargs): def add_custom_attributes(self, **kwargs): custom_key = 'custom' custom_attributes = {} - + for old_key, value in kwargs.items(): new_key = custom_key + ':' + old_key custom_attributes[new_key] = value - + self.custom_attributes = custom_attributes - def register(self, username, password, attr_map=None): + def register(self, username, password, attr_map=None, validation_data=None): """ Register the user. Other base attributes from AWS Cognito User Pools are address, birthdate, email, family_name (last name), gender, @@ -285,6 +292,7 @@ def register(self, username, password, attr_map=None): :param username: User Pool username :param password: User Pool password :param attr_map: Attribute map to Cognito's attributes + :param validation_data: Validation data dict for custom validation in pre-sign up trigger :return response: Response from Cognito Example response:: @@ -297,7 +305,10 @@ def register(self, username, password, attr_map=None): } } """ - attributes = self.base_attributes.copy() + if self.base_attributes is None: + attributes = {} + else: + attributes = self.base_attributes.copy() if self.custom_attributes: attributes.update(self.custom_attributes) cognito_attributes = dict_to_cognito(attributes, attr_map) @@ -307,6 +318,10 @@ def register(self, username, password, attr_map=None): 'Password': password, 'UserAttributes': cognito_attributes } + if validation_data: + cognito_validation_data = dict_to_cognito(validation_data) + params.update({'ValidationData': cognito_validation_data}) + self._add_secret_hash(params, 'SecretHash') response = self.client.sign_up(**params) @@ -330,7 +345,7 @@ def admin_confirm_sign_up(self, username=None): Username=username, ) - def confirm_sign_up(self,confirmation_code,username=None): + def confirm_sign_up(self, confirmation_code, username=None): """ Using the confirmation code that is either sent via email or text message. @@ -338,14 +353,22 @@ def confirm_sign_up(self,confirmation_code,username=None): :param username: User's username :return: """ - if not username: - username = self.username params = {'ClientId': self.client_id, - 'Username': username, + 'Username': self.username if username is None else username, 'ConfirmationCode': confirmation_code} self._add_secret_hash(params, 'SecretHash') self.client.confirm_sign_up(**params) + def resend_confirmation_code(self, username=None): + """ + Resend the confirmation for registration + :param username: User's username + """ + params = {'ClientId': self.client_id, + 'Username': self.username if username is None else username} + self._add_secret_hash(params, 'SecretHash') + self.client.resend_confirmation_code(**params) + def admin_authenticate(self, password): """ Authenticate the user using admin super privileges @@ -370,13 +393,14 @@ def admin_authenticate(self, password): self.verify_token(tokens['AuthenticationResult']['AccessToken'], 'access_token','access') self.token_type = tokens['AuthenticationResult']['TokenType'] - def authenticate(self, password): + def authenticate(self, password): """ Authenticate the user using the SRP protocol :param password: The user's passsword :return: """ - aws = AWSSRP(username=self.username, password=password, pool_id=self.user_pool_id, + + aws = WarrantLite(username=self.username, password=password, pool_id=self.user_pool_id, client_id=self.client_id, client=self.client, client_secret=self.client_secret) tokens = aws.authenticate_user() @@ -391,7 +415,7 @@ def new_password_challenge(self, password, new_password): :param password: The user's current passsword :param password: The user's new passsword """ - aws = AWSSRP(username=self.username, password=password, pool_id=self.user_pool_id, + aws = WarrantLite(username=self.username, password=password, pool_id=self.user_pool_id, client_id=self.client_id, client=self.client, client_secret=self.client_secret) tokens = aws.set_new_password_challenge(new_password) @@ -459,7 +483,7 @@ def get_user(self, attr_map=None): attribute_list=user.get('UserAttributes'), metadata=user_metadata,attr_map=attr_map) - def get_users(self, attr_map=None): + def get_users(self, attr_map=None, attr_filter=None): """ Returns all users for a user pool. Returns instances of the self.user_class. @@ -467,6 +491,8 @@ def get_users(self, attr_map=None): :return: """ kwargs = {"UserPoolId":self.user_pool_id} + if attr_filter is not None: + kwargs['Filter'] = "%s = \"%s\"" % (attr_filter['Name'], attr_filter['Value']) response = self.client.list_users(**kwargs) return [self.get_user_obj(user.get('Username'), @@ -543,10 +569,13 @@ def validate_verification(self, confirmation_code, attribute='email'): Code=confirmation_code ) - def renew_access_token(self): + def renew_access_token(self): # BUG: stems from check_token - NotAuthorizedException """ Sets a new access token on the User using the refresh token. """ + + # Potential fix: try to make the client request unsigned + auth_params = {'REFRESH_TOKEN': self.refresh_token} self._add_secret_hash(auth_params, 'SECRET_HASH') refresh_response = self.client.initiate_auth( @@ -624,7 +653,7 @@ def _add_secret_hash(self, parameters, key): to a parameters dictionary at a specified key """ if self.client_secret is not None: - secret_hash = AWSSRP.get_secret_hash(self.username, self.client_id, + secret_hash = WarrantLite.get_secret_hash(self.username, self.client_id, self.client_secret) parameters[key] = secret_hash diff --git a/warrant/aws_srp.py b/warrant/aws_srp.py deleted file mode 100644 index 3b53f34e..00000000 --- a/warrant/aws_srp.py +++ /dev/null @@ -1,250 +0,0 @@ -import base64 -import binascii -import datetime -import hashlib -import hmac -import re - -import boto3 -import os -import six - -from .exceptions import ForceChangePasswordException - -# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22 -n_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1' + '29024E088A67CC74020BBEA63B139B22514A08798E3404DD' + \ - 'EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245' + 'E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED' + \ - 'EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D' + 'C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F' + \ - '83655D23DCA3AD961C62F356208552BB9ED529077096966D' + '670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B' + \ - 'E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9' + 'DE2BCBF6955817183995497CEA956AE515D2261898FA0510' + \ - '15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64' + 'ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7' + \ - 'ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B' + 'F12FFA06D98A0864D87602733EC86A64521F2B18177B200C' + \ - 'BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31' + '43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF' -# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49 -g_hex = '2' -info_bits = bytearray('Caldera Derived Key', 'utf-8') - - -def hash_sha256(buf): - """AuthenticationHelper.hash""" - a = hashlib.sha256(buf).hexdigest() - return (64 - len(a)) * '0' + a - - -def hex_hash(hex_string): - return hash_sha256(bytearray.fromhex(hex_string)) - - -def hex_to_long(hex_string): - return int(hex_string, 16) - - -def long_to_hex(long_num): - return '%x' % long_num - - -def get_random(nbytes): - random_hex = binascii.hexlify(os.urandom(nbytes)) - return hex_to_long(random_hex) - - -def pad_hex(long_int): - """ - Converts a Long integer (or hex string) to hex format padded with zeroes for hashing - :param {Long integer|String} long_int Number or string to pad. - :return {String} Padded hex string. - """ - if not isinstance(long_int, six.string_types): - hash_str = long_to_hex(long_int) - else: - hash_str = long_int - if len(hash_str) % 2 == 1: - hash_str = '0%s' % hash_str - elif hash_str[0] in '89ABCDEFabcdef': - hash_str = '00%s' % hash_str - return hash_str - - -def compute_hkdf(ikm, salt): - """ - Standard hkdf algorithm - :param {Buffer} ikm Input key material. - :param {Buffer} salt Salt value. - :return {Buffer} Strong key material. - @private - """ - prk = hmac.new(salt, ikm, hashlib.sha256).digest() - info_bits_update = info_bits + bytearray(chr(1), 'utf-8') - hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest() - return hmac_hash[:16] - - -def calculate_u(big_a, big_b): - """ - Calculate the client's value U which is the hash of A and B - :param {Long integer} big_a Large A value. - :param {Long integer} big_b Server B value. - :return {Long integer} Computed U value. - """ - u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b)) - return hex_to_long(u_hex_hash) - - -class AWSSRP(object): - - NEW_PASSWORD_REQUIRED_CHALLENGE = 'NEW_PASSWORD_REQUIRED' - PASSWORD_VERIFIER_CHALLENGE = 'PASSWORD_VERIFIER' - - def __init__(self, username, password, pool_id, client_id, pool_region=None, - client=None, client_secret=None): - if pool_region is not None and client is not None: - raise ValueError("pool_region and client should not both be specified " - "(region should be passed to the boto3 client instead)") - - self.username = username - self.password = password - self.pool_id = pool_id - self.client_id = client_id - self.client_secret = client_secret - self.client = client if client else boto3.client('cognito-idp', region_name=pool_region) - self.big_n = hex_to_long(n_hex) - self.g = hex_to_long(g_hex) - self.k = hex_to_long(hex_hash('00' + n_hex + '0' + g_hex)) - self.small_a_value = self.generate_random_small_a() - self.large_a_value = self.calculate_a() - - def generate_random_small_a(self): - """ - helper function to generate a random big integer - :return {Long integer} a random value. - """ - random_long_int = get_random(128) - return random_long_int % self.big_n - - def calculate_a(self): - """ - Calculate the client's public value A = g^a%N - with the generated random number a - :param {Long integer} a Randomly generated small A. - :return {Long integer} Computed large A. - """ - big_a = pow(self.g, self.small_a_value, self.big_n) - # safety check - if (big_a % self.big_n) == 0: - raise ValueError('Safety check for A failed') - return big_a - - def get_password_authentication_key(self, username, password, server_b_value, salt): - """ - Calculates the final hkdf based on computed S value, and computed U value and the key - :param {String} username Username. - :param {String} password Password. - :param {Long integer} server_b_value Server B value. - :param {Long integer} salt Generated salt. - :return {Buffer} Computed HKDF value. - """ - u_value = calculate_u(self.large_a_value, server_b_value) - if u_value == 0: - raise ValueError('U cannot be zero.') - username_password = '%s%s:%s' % (self.pool_id.split('_')[1], username, password) - username_password_hash = hash_sha256(username_password.encode('utf-8')) - - x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash)) - g_mod_pow_xn = pow(self.g, x_value, self.big_n) - int_value2 = server_b_value - self.k * g_mod_pow_xn - s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n) - hkdf = compute_hkdf(bytearray.fromhex(pad_hex(s_value)), - bytearray.fromhex(pad_hex(long_to_hex(u_value)))) - return hkdf - - def get_auth_params(self): - auth_params = {'USERNAME': self.username, - 'SRP_A': long_to_hex(self.large_a_value)} - if self.client_secret is not None: - auth_params.update({ - "SECRET_HASH": - self.get_secret_hash(self.username,self.client_id, self.client_secret)}) - return auth_params - - @staticmethod - def get_secret_hash(username, client_id, client_secret): - message = bytearray(username + client_id, 'utf-8') - hmac_obj = hmac.new(bytearray(client_secret, 'utf-8'), message, hashlib.sha256) - return base64.standard_b64encode(hmac_obj.digest()).decode('utf-8') - - def process_challenge(self, challenge_parameters): - user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP'] - salt_hex = challenge_parameters['SALT'] - srp_b_hex = challenge_parameters['SRP_B'] - secret_block_b64 = challenge_parameters['SECRET_BLOCK'] - # re strips leading zero from a day number (required by AWS Cognito) - timestamp = re.sub(r" 0(\d) ", r" \1 ", - datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y")) - hkdf = self.get_password_authentication_key(user_id_for_srp, - self.password, hex_to_long(srp_b_hex), salt_hex) - secret_block_bytes = base64.standard_b64decode(secret_block_b64) - msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \ - bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8') - hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) - signature_string = base64.standard_b64encode(hmac_obj.digest()) - response = {'TIMESTAMP': timestamp, - 'USERNAME': user_id_for_srp, - 'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64, - 'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')} - if self.client_secret is not None: - response.update({ - "SECRET_HASH": - self.get_secret_hash(self.username, self.client_id, self.client_secret)}) - return response - - def authenticate_user(self, client=None): - boto_client = self.client or client - auth_params = self.get_auth_params() - response = boto_client.initiate_auth( - AuthFlow='USER_SRP_AUTH', - AuthParameters=auth_params, - ClientId=self.client_id - ) - if response['ChallengeName'] == self.PASSWORD_VERIFIER_CHALLENGE: - challenge_response = self.process_challenge(response['ChallengeParameters']) - tokens = boto_client.respond_to_auth_challenge( - ClientId=self.client_id, - ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, - ChallengeResponses=challenge_response) - - if tokens.get('ChallengeName') == self.NEW_PASSWORD_REQUIRED_CHALLENGE: - raise ForceChangePasswordException('Change password before authenticating') - - return tokens - else: - raise NotImplementedError('The %s challenge is not supported' % response['ChallengeName']) - - def set_new_password_challenge(self, new_password, client=None): - boto_client = self.client or client - auth_params = self.get_auth_params() - response = boto_client.initiate_auth( - AuthFlow='USER_SRP_AUTH', - AuthParameters=auth_params, - ClientId=self.client_id - ) - if response['ChallengeName'] == self.PASSWORD_VERIFIER_CHALLENGE: - challenge_response = self.process_challenge(response['ChallengeParameters']) - tokens = boto_client.respond_to_auth_challenge( - ClientId=self.client_id, - ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, - ChallengeResponses=challenge_response) - - if tokens['ChallengeName'] == self.NEW_PASSWORD_REQUIRED_CHALLENGE: - challenge_response = { - 'USERNAME': auth_params['USERNAME'], - 'NEW_PASSWORD': new_password - } - new_password_response = boto_client.respond_to_auth_challenge( - ClientId=self.client_id, - ChallengeName=self.NEW_PASSWORD_REQUIRED_CHALLENGE, - Session=tokens['Session'], - ChallengeResponses=challenge_response) - return new_password_response - return tokens - else: - raise NotImplementedError('The %s challenge is not supported' % response['ChallengeName']) diff --git a/warrant/tests/tests.py b/warrant/tests/tests.py index a653b892..5559e721 100644 --- a/warrant/tests/tests.py +++ b/warrant/tests/tests.py @@ -4,7 +4,6 @@ from envs import env from warrant import Cognito, UserObj, GroupObj, TokenVerificationException -from warrant.aws_srp import AWSSRP class UserObjTestCase(unittest.TestCase): @@ -170,31 +169,5 @@ def test_admin_authenticate(self): self.assertNotEqual(self.user.refresh_token, None) -class AWSSRPTestCase(unittest.TestCase): - - def setUp(self): - if env('USE_CLIENT_SECRET') == 'True': - self.client_secret = env('COGNITO_CLIENT_SECRET') - self.app_id = env('COGNITO_APP_WITH_SECRET_ID') - else: - self.app_id = env('COGNITO_APP_ID') - self.client_secret = None - self.cognito_user_pool_id = env('COGNITO_USER_POOL_ID') - self.username = env('COGNITO_TEST_USERNAME') - self.password = env('COGNITO_TEST_PASSWORD') - self.aws = AWSSRP(username=self.username, password=self.password, - pool_id=self.cognito_user_pool_id, - client_id=self.app_id, client_secret=self.client_secret) - - def tearDown(self): - del self.aws - - def test_authenticate_user(self): - tokens = self.aws.authenticate_user() - self.assertTrue('IdToken' in tokens['AuthenticationResult']) - self.assertTrue('AccessToken' in tokens['AuthenticationResult']) - self.assertTrue('RefreshToken' in tokens['AuthenticationResult']) - - if __name__ == '__main__': unittest.main()