-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Description
Hi guys,
I implemented a python3 async version of emailage api client.
here is the code
#!/usr/bin/env python3
from uuid import uuid4
import time
import json
import re
from six import b
import urllib
from hashlib import sha1
import hmac
import base64
import aiohttp
class AioEmailageClient(object):
def __init__(self, secret, token):
self.secret, self.token = secret, token
self.hmac_key = token + '&'
self.domain = 'https://api.emailage.com'
self.session = aiohttp.ClientSession()
async def request(self, endpoint, **params):
url = self.domain + '/emailagevalidator' + endpoint + '/'
params = dict(
format='json',
oauth_consumer_key=self.secret,
oauth_nonce=str(uuid4()),
oauth_signature_method='HMAC-SHA1',
oauth_timestamp=int(time.time()),
oauth_version="1.0",
**params
)
def _quote(obj):
return urllib.parse.quote(str(obj), safe='')
def _normalize_query_parameters(params):
return '&'.join(map(lambda pair: '='.join([_quote(pair[0]), _quote(pair[1])]), sorted(params.items())))
def _concatenate_request_elements(method, url, query):
return '&'.join(map(_quote, [str(method).upper(), url, query]))
def _hmac_sha1(base_string, hmac_key):
"""9.2. HMAC-SHA1"""
hash = hmac.new(b(hmac_key), b(base_string), sha1)
return hash.digest()
def _encode(digest):
"""9.2.1. Generating Signature"""
return base64.b64encode(digest).decode('ascii').rstrip('\n')
query = _normalize_query_parameters(params)
base_string = _concatenate_request_elements('GET', url, query)
digest = _hmac_sha1(base_string, self.hmac_key)
params['oauth_signature'] = _encode(digest)
async with self.session.get(url,params=params) as resp:
# Remove any unreadable characters from response payload
json_data = re.sub(r'^[^{]+', '', await resp.text())
return json.loads(json_data)
async def query(self, query, **params):
if type(query) is tuple:
query = '+'.join(query)
params['query'] = query
return await self.request('', **params)
async def query_email(self, email, **params):
if not re.match(r'^[^@\s]+@([^@\s]+\.)+[^@\s]+$', email):
raise ValueError('{} is not a valid email address.'.format(email))
return await self.query(email, **params)Metadata
Metadata
Assignees
Labels
No labels