Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions src/keycloak/openid_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
except ImportError:
from urllib import urlencode # noqa: F041

from jose import jwt
from jose import jwt, ExpiredSignatureError

PATH_WELL_KNOWN = "auth/realms/{}/.well-known/openid-configuration"


class KeycloakOpenidConnect(WellKnownMixin):

_well_known = None
_client_id = None
_client_secret = None
Expand Down Expand Up @@ -138,10 +137,10 @@ def userinfo(self, token):
url = self.well_known['userinfo_endpoint']

return self._realm.client.get(url, headers={
"Authorization": "Bearer {}".format(
token
)
})
"Authorization": "Bearer {}".format(
token
)
})

def uma_ticket(self, token, **kwargs):
"""
Expand Down Expand Up @@ -196,8 +195,9 @@ def authorization_code(self, code, redirect_uri):
:rtype: dict
:return: Access token response
"""
return self._token_request(grant_type='authorization_code', code=code,
token = self._token_request(grant_type='authorization_code', code=code,
redirect_uri=redirect_uri)
return Token(token, self)

def password_credentials(self, username, password, **kwargs):
"""
Expand All @@ -210,9 +210,10 @@ def password_credentials(self, username, password, **kwargs):
:rtype: dict
:return: Access token response
"""
return self._token_request(grant_type='password',
token = self._token_request(grant_type='password',
username=username, password=password,
**kwargs)
return Token(token, self)

def client_credentials(self, **kwargs):
"""
Expand All @@ -224,7 +225,8 @@ def client_credentials(self, **kwargs):
:rtype: dict
:return: Access token response
"""
return self._token_request(grant_type='client_credentials', **kwargs)
token = self._token_request(grant_type='client_credentials', **kwargs)
return Token(token, self)

def refresh_token(self, refresh_token, **kwargs):
"""
Expand Down Expand Up @@ -306,3 +308,26 @@ def _token_request(self, grant_type, **kwargs):

return self._realm.client.post(self.get_url('token_endpoint'),
data=payload)


class Token:
def __init__(self, token, oidc: KeycloakOpenidConnect) -> None:
self.oidc = oidc
self.key = self.oidc.certs()['keys'][0]
self.token = token

def __getattr__(self, attr):
return self.token[attr]

def __call__(self):
if self.is_expired():
print("Token expired, trying a new one")
self.token = self.oidc.refresh_token(self.token['refresh_token'])
return self.token["access_token"]

def is_expired(self):
try:
self.oidc.decode_token(self.token['access_token'], self.key)
return False
except ExpiredSignatureError:
return True
6 changes: 3 additions & 3 deletions tests/keycloak/test_openid_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_authorization_url(self):
)

def test_authorization_code(self):
response = self.openid_client.authorization_code(
token = self.openid_client.authorization_code(
code='some-code',
redirect_uri='https://redirect-uri'
)
Expand All @@ -100,7 +100,7 @@ def test_authorization_code(self):
'redirect_uri': 'https://redirect-uri'
}
)
self.assertEqual(response, self.realm.client.post.return_value)
self.assertEqual(token.token, self.realm.client.post.return_value)

def test_client_credentials(self):
response = self.openid_client.client_credentials(
Expand All @@ -115,7 +115,7 @@ def test_client_credentials(self):
'scope': 'scope another-scope'
}
)
self.assertEqual(response, self.realm.client.post.return_value)
self.assertEqual(response.token, self.realm.client.post.return_value)

def test_refresh_token(self):
response = self.openid_client.refresh_token(
Expand Down