diff --git a/.travis.yml b/.travis.yml index 5339fdd..202f496 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,6 +36,8 @@ before_install: - sudo docker cp tmp:/usr/local/bin/etcdctl /usr/bin/etcdctl && sudo chmod 755 /usr/bin/etcdctl - sudo docker rm tmp - which etcdctl + - sudo mkdir /tmp/shared + - sudo chmod 777 /tmp/shared # command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors install: pip install -U tox-travis codecov diff --git a/etcd3/aio_client.py b/etcd3/aio_client.py index 97756c1..65e07f5 100644 --- a/etcd3/aio_client.py +++ b/etcd3/aio_client.py @@ -9,7 +9,9 @@ import aiohttp import six from aiohttp.client import _RequestContextManager +from asyncio import Lock +from .utils import retry_all_hosts from .baseclient import BaseClient from .baseclient import BaseModelizedStreamResponse from .baseclient import DEFAULT_VERSION @@ -73,7 +75,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): self.close() - async def __aiter__(self): + def __aiter__(self): return self async def __anext__(self): @@ -111,7 +113,7 @@ def __init__(self, resp): self.left_chunk = b'' self.i = 0 - async def __aiter__(self): + def __aiter__(self): return self async def next(self): @@ -133,16 +135,18 @@ async def next(self): class AioClient(BaseClient): - def __init__(self, host='127.0.0.1', port=2379, protocol='http', + def __init__(self, host=None, port=None, endpoints=None, protocol='http', cert=(), verify=None, timeout=None, headers=None, user_agent=None, pool_size=30, username=None, password=None, token=None, server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION): - super(AioClient, self).__init__(host=host, port=port, protocol=protocol, - cert=cert, verify=verify, - timeout=timeout, headers=headers, user_agent=user_agent, pool_size=pool_size, - username=username, password=password, token=token, - server_version=server_version, cluster_version=cluster_version) + self.current_endpoint_lock = Lock() + super(AioClient, self).__init__( + host=host, port=port, endpoints=endpoints, protocol=protocol, + cert=cert, verify=verify, timeout=timeout, headers=headers, + user_agent=user_agent, pool_size=pool_size, + username=username, password=password, token=token, + server_version=server_version, cluster_version=cluster_version) self.ssl_context = None if self.cert: if verify is False: @@ -165,7 +169,8 @@ def __init__(self, host='127.0.0.1', port=2379, protocol='http', warnings.warn(Etcd3Warning("the openssl version of your python is too old to support TLSv1.1+," "please upgrade you python")) ssl_context.verify_mode = cert_reqs - ssl_context.load_verify_locations(cafile=cafile) + if cafile: + ssl_context.load_verify_locations(cafile=cafile) ssl_context.load_cert_chain(*self.cert) connector = aiohttp.TCPConnector(limit=pool_size, ssl=self.ssl_context) self.session = aiohttp.ClientSession(connector=connector) @@ -225,6 +230,7 @@ async def _raise_for_status(resp): code = data.get('code') raise get_client_error(error, code, status, resp) + @retry_all_hosts def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs): """ call ETCDv3 RPC and return response object diff --git a/etcd3/baseclient.py b/etcd3/baseclient.py index 55aa64e..2e1decf 100644 --- a/etcd3/baseclient.py +++ b/etcd3/baseclient.py @@ -25,6 +25,7 @@ from .swaggerdefs import get_spec from .utils import Etcd3Warning from .utils import log +from .utils import EtcdEndpoint from .version import __version__ @@ -50,13 +51,18 @@ def __iter__(self): class BaseClient(AuthAPI, ClusterAPI, KVAPI, LeaseAPI, MaintenanceAPI, WatchAPI, ExtraAPI, LockAPI): - def __init__(self, host='127.0.0.1', port=2379, protocol='http', - cert=(), verify=None, - timeout=None, headers=None, user_agent=None, pool_size=30, + def __init__(self, host=None, port=None, endpoints=None, protocol='http', cert=(), + verify=None, timeout=None, headers=None, user_agent=None, pool_size=30, username=None, password=None, token=None, - server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION): - self.host = host - self.port = port + server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION, + failover_whitelist=None): + if failover_whitelist is None: + failover_whitelist = ["range", "watch", "status"] + if host is not None: + self.endpoints = ([EtcdEndpoint(host, port)]) + else: + self.endpoints = endpoints + self.current_endpoint = self.endpoints[0] self.cert = cert self.protocol = protocol if cert: @@ -74,6 +80,7 @@ def __init__(self, host='127.0.0.1', port=2379, protocol='http', self.cluster_version = cluster_version self.api_spec = None self.api_prefix = '/v3alpha' + self.failover_whitelist = failover_whitelist self._retrieve_version() self._verify_version() self._get_prefix() @@ -123,7 +130,9 @@ def baseurl(self): """ :return: baseurl from protocol, host, self """ - return '{}://{}:{}'.format(self.protocol, self.host, self.port) + return '{}://{}:{}'.format(self.protocol, + self.current_endpoint.host, + self.current_endpoint.port) def _prefix(self, method): return self.api_prefix + method diff --git a/etcd3/client.py b/etcd3/client.py index 76430d2..3d04faa 100644 --- a/etcd3/client.py +++ b/etcd3/client.py @@ -6,8 +6,10 @@ import requests import six +from threading import Lock from .baseclient import BaseClient +from .utils import retry_all_hosts from .baseclient import BaseModelizedStreamResponse from .baseclient import DEFAULT_VERSION from .errors import Etcd3Exception @@ -86,8 +88,8 @@ def iter_response(resp): class Client(BaseClient): - def __init__(self, host='127.0.0.1', port=2379, protocol='http', - cert=(), verify=None, + def __init__(self, host=None, port=None, endpoints=None, + protocol='http', cert=(), verify=None, timeout=None, headers=None, user_agent=None, pool_size=30, username=None, password=None, token=None, max_retries=0, server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION): @@ -100,11 +102,13 @@ def __init__(self, host='127.0.0.1', port=2379, protocol='http', which we retry a request, import urllib3's ``Retry`` class and pass that instead. """ - super(Client, self).__init__(host=host, port=port, protocol=protocol, - cert=cert, verify=verify, - timeout=timeout, headers=headers, user_agent=user_agent, pool_size=pool_size, - username=username, password=password, token=token, - server_version=server_version, cluster_version=cluster_version) + self.current_endpoint_lock = Lock() + super(Client, self).__init__( + host=host, port=port, endpoints=endpoints, protocol=protocol, + cert=cert, verify=verify, timeout=timeout, headers=headers, + user_agent=user_agent, pool_size=pool_size, + username=username, password=password, token=token, + server_version=server_version, cluster_version=cluster_version) self._session = requests.session() self._session.cert = self.cert self._session.verify = self.verify @@ -164,6 +168,7 @@ def _post(self, url, data=None, json=None, **kwargs): """ return self._session.post(url, data=data, json=json, **kwargs) + @retry_all_hosts def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs): # TODO: add modelize param """ call ETCDv3 RPC and return response object diff --git a/etcd3/utils.py b/etcd3/utils.py index c935127..14db038 100644 --- a/etcd3/utils.py +++ b/etcd3/utils.py @@ -7,9 +7,18 @@ import sys import time import warnings +import copy +import inspect +import socket from collections import namedtuple, OrderedDict, Hashable from subprocess import Popen, PIPE from threading import Lock +from requests.exceptions import ChunkedEncodingError, ConnectTimeout +from urllib3.exceptions import MaxRetryError +try: + from httplib import IncompleteRead +except ImportError: + from http.client import IncompleteRead try: # pragma: no cover from inspect import getfullargspec as getargspec @@ -382,3 +391,70 @@ def find_executable(executable, path=None): # pragma: no cover f = os.path.join(p, execname) if os.path.isfile(f): return f + +failover_exceptions = (ChunkedEncodingError, + IncompleteRead, + ConnectTimeout, + MaxRetryError, + socket.timeout, + ) + +def retry_all_hosts(func): + def current_caller_name(): + previous_frame = inspect.currentframe().f_back.f_back + return inspect.getframeinfo(previous_frame).function + def wrapper(self, *args, **kwargs): + calling_function = current_caller_name() + if not calling_function in self.failover_whitelist: + log.debug('%s not in failover waitlist(%s)' % + (calling_function, self.failover_whitelist)) + try: + return func(self, *args, **kwargs) + except failover_exceptions as e: + if not calling_function in self.failover_whitelist: + log.debug('%s not in failover waitlist(%s)' % + (calling_function, self.failover_whitelist)) + raise e + errors = [] + got_result = False + with self.current_endpoint_lock: + # to exclude the current endpoint it should be saved + # before the call in the outmost `try`, and the + # whole wrapper should depend on the lock + for endpoint in self.endpoints: + self.current_endpoint = endpoint + try: + self.current_endpoint = endpoint + ret = func(self, *args, **kwargs) + got_result = True + break + except failover_exceptions as e: + errors.append(e) + log.warning('Failed to call %s(args: %s, kwargs: %s) on' + ' endpoint %s (%s)' % + (func.__name__, args, kwargs, endpoint, e)) + except Exception as e: + log.debug('received exception %s, not in ' + 'failover_exceptions(%s)' % + (e, failover_exceptions)) + if not got_result: + log.error('Failed to call %s(args: %s, kwargs: %s) on all ' + 'endpoints: %s. Got errors: %s' % + (func.__name__, args, kwargs, + call_endpoints, errors)) + exception_types = [x.__class__ for x in errors] + if len(set(exception_types)) == 1: + raise errors[0] + else: + raise Etcd3Exception('Failed failover') + return ret + return wrapper + + +class EtcdEndpoint(): + def __init__(self, host='127.0.0.1', port=2379): + self.host = host + self.port = port + + def __repr__(self): + return "EtcdEndpoint(host=%s, port=%s)" % (self.host, self.port) diff --git a/requirements_dev.txt b/requirements_dev.txt index e94c85c..a0414f3 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -19,3 +19,4 @@ m2r==0.2.1 codecov>=1.4.0 codacy-coverage==1.3.11 twine==1.13.0 +docker==3.7.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..97839d0 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,76 @@ +import six +from etcd3.client import Client +import pytest +from .etcd_cluster import EtcdTestCluster + + +@pytest.fixture(scope='session') +def etcd_cluster(request): + # function_name = request.function.__name__ + # function_name = re.sub(r"[^a-zA-Z0-9]+", "", function_name) + cluster = EtcdTestCluster(ident='cleartext', size=3) + + def fin(): + cluster.down() + request.addfinalizer(fin) + cluster.up() + cluster.wait_ready() + + return cluster + + +@pytest.fixture(scope='session') +def etcd_cluster_ssl(request): + # function_name = request.function.__name__ + # function_name = re.sub(r"[^a-zA-Z0-9]+", "", function_name) + cluster = EtcdTestCluster(ident='ssl', size=3, ssl=True) + + def fin(): + cluster.down() + request.addfinalizer(fin) + cluster.up() + cluster.wait_ready() + + return cluster + + +@pytest.fixture(scope='module') +def client(etcd_cluster): + """ + init Etcd3Client, close its connection-pool when teardown + """ + # _, p, _ = docker_run_etcd_main() + c = Client(endpoints=etcd_cluster.get_endpoints(), + protocol='https' if etcd_cluster.ssl else 'http') + yield c + c.close() + + +@pytest.fixture +def clear(etcd_cluster): + def _clear(): + etcd_cluster.etcdctl('del --from-key ""') + return _clear + + +def teardown_auth(etcd_cluster): # pragma: no cover + """ + disable auth, delete all users and roles + """ + etcd_cluster.etcdctl('--user root:root auth disable') + etcd_cluster.etcdctl('--user root:changed auth disable') + for i in (etcd_cluster.etcdctl('role list') or '').splitlines(): + if six.PY3: # pragma: no cover + i = six.text_type(i, encoding='utf-8') + etcd_cluster.etcdctl('role delete %s' % i) + for i in (etcd_cluster.etcdctl('user list') or '').splitlines(): + if six.PY3: # pragma: no cover + i = six.text_type(i, encoding='utf-8') + etcd_cluster.etcdctl('user delete %s' % i) + + +def enable_auth(etcd_cluster): # pragma: no cover + etcd_cluster.etcdctl('user add root:root') + etcd_cluster.etcdctl('role add root') + etcd_cluster.etcdctl('user grant root root') + etcd_cluster.etcdctl('auth enable') diff --git a/tests/docker_cli.py b/tests/docker_cli.py deleted file mode 100644 index e3916da..0000000 --- a/tests/docker_cli.py +++ /dev/null @@ -1,149 +0,0 @@ -import time - -import json -import os -import shlex -import six - -from etcd3.utils import find_executable, exec_cmd -from .envs import ETCD_IMG - -DOCKER_PATH = find_executable('docker') -CERTS_DIR = os.path.join(os.path.dirname(__file__), 'certs') - -SERVER_CA_PATH = '/certs/ca.pem' -SERVER_CERT_PATH = '/certs/server.pem' -SERVER_KEY_PATH = '/certs/server-key.pem' - -CA_PATH = os.path.join(CERTS_DIR, 'ca.pem') -CERT_PATH = os.path.join(CERTS_DIR, 'client.pem') -KEY_PATH = os.path.join(CERTS_DIR, 'client-key.pem') - - -def docker(*args, **kwargs): # pragma: no cover - if len(args) == 1: - args = shlex.split(args[0]) - raise_error = kwargs.get('raise_error', True) - envs = os.environ - cmd = [DOCKER_PATH] - cmd.extend(args) - # return subprocess.check_call(cmd, env=envs) - return exec_cmd(cmd, envs, raise_error) - - -# get the etcd server's http port and tcp peer port -def get_h_t(n): # pragma: no cover - h = 2379 + 2 * (n - 1) # the http port - t = h + 1 # the tcp peer port - return h, t - - -def docker_run_etcd_main(): # pragma: no cover - if NO_DOCKER_SERVICE: - return None, 2379, None - n = 1 - h, t = get_h_t(n) # 2379, 2380 - name = 'etcd3_1' - try: - out = docker('inspect %s' % name) - if isinstance(out, six.binary_type): - out = six.text_type(out, encoding='utf-8') - spec = json.loads(out) - if isinstance(spec, list): - spec = spec[0] - image = spec.get('Config', {}).get('Image') - if image != ETCD_IMG or not spec.get('State', {}).get('Running'): - if spec.get('Config', {}).get('Labels', {}).get('etcd3.py.test') == 'main': - print(docker('rm -f %s' % name)) - raise RuntimeError - cmds = spec.get('Config', {}).get('Cmd', []) - for i, c in enumerate(cmds): - if c == '--listen-client-urls': - h = int(cmds[i + 1].split(':')[-1]) - if c == '--listen-peer-urls': - t = int(cmds[i + 1].split(':')[-1]) - return '', h, t - except: - cmd = 'run -d -p {h}:{h} -p {t}:{t} --name {name} ' \ - '-l etcd3.py.test=main ' \ - '{img} ' \ - 'etcd --name node{n} ' \ - '--initial-advertise-peer-urls http://0.0.0.0:{t} ' \ - '--listen-peer-urls http://0.0.0.0:{t} ' \ - '--advertise-client-urls http://0.0.0.0:{h} ' \ - '--listen-client-urls http://0.0.0.0:{h} ' \ - '--initial-cluster node{n}=http://0.0.0.0:{t}'.format(name=name, n=n, h=h, t=t, img=ETCD_IMG) - print(cmd) - out = docker(cmd) - print(out) - time.sleep(5) - return out, h, t - - -def docker_run_etcd(n=2): # pragma: no cover - n = n or 2 # the node sequence - h, t = get_h_t(n) - cmd = 'run -d -p {h}:{h} -p {t}:{t} --name etcd3_{n} ' \ - '-l etcd3.py.test=node{n} ' \ - '{img} ' \ - 'etcd --name node{n} ' \ - '--initial-advertise-peer-urls http://0.0.0.0:{t} ' \ - '--listen-peer-urls http://0.0.0.0:{t} ' \ - '--advertise-client-urls http://0.0.0.0:{h} ' \ - '--listen-client-urls http://0.0.0.0:{h} ' \ - '--initial-cluster node{n}=http://0.0.0.0:{t}'.format(n=n, h=h, t=t, img=ETCD_IMG) - print(cmd) - return docker(cmd), h, t - - -def docker_rm_etcd(n=2): # pragma: no cover - n = n or 2 # the node sequence - cmd = 'rm -f etcd3_{n}'.format(n=n) - return docker(cmd) - - -def get_ssl_ht_t(): # pragma: no cover - return get_h_t(10) - - -def docker_run_etcd_ssl(): # pragma: no cover - n = 10 # the node sequence - h, t = get_h_t(n) - cmd = 'run -d -p {h}:{h} -p {t}:{t} --name etcd3_ssl -v {certs_dir}:/certs ' \ - '-l etcd3.py.test=node{n} ' \ - '{img} ' \ - 'etcd --name node_ssl ' \ - '--client-cert-auth ' \ - '--trusted-ca-file={ca} ' \ - '--cert-file={cer} ' \ - '--key-file={key} ' \ - '--initial-advertise-peer-urls http://0.0.0.0:{t} ' \ - '--listen-peer-urls http://0.0.0.0:{t} ' \ - '--advertise-client-urls https://0.0.0.0:{h} ' \ - '--listen-client-urls https://0.0.0.0:{h} ' \ - '--initial-cluster node_ssl=http://0.0.0.0:{t}' \ - .format(n=n, h=h, t=t, img=ETCD_IMG, - certs_dir=CERTS_DIR, - ca=SERVER_CA_PATH, - cer=SERVER_CERT_PATH, - key=SERVER_KEY_PATH) - print(cmd) - return docker(cmd), h, t - - -def docker_rm_etcd_ssl(raise_error=False): # pragma: no cover - cmd = 'rm -f etcd3_ssl' - return docker(cmd, raise_error=raise_error) - - -NO_DOCKER_SERVICE = True -try: # pragma: no cover - if DOCKER_PATH and docker('version'): - NO_DOCKER_SERVICE = False - else: - print("docker executable not found") -except Exception as e: # pragma: no cover - print(e) - -if __name__ == '__main__': - docker_run_etcd_ssl() diff --git a/tests/envs.py b/tests/envs.py index 5fef255..8621edf 100644 --- a/tests/envs.py +++ b/tests/envs.py @@ -1,6 +1,5 @@ import logging import os -from six.moves.urllib_parse import urlparse logging.basicConfig(format='%(name)s %(levelname)s - %(message)s') log = logging.getLogger() @@ -8,13 +7,25 @@ handler = logging.StreamHandler() log.addHandler(handler) -ETCD_ENDPOINT = os.getenv('ETCD_ENDPOINT') or 'http://localhost:2379' -_url = urlparse(ETCD_ENDPOINT) +ETCD_VER = os.getenv('ETCD_VER') or 'v3.3.0' -protocol = _url.scheme +ETCD_IMG = 'quay.io/coreos/etcd:' + ETCD_VER -host, port = _url.netloc.split(':') +DOCKER_PUBLISH_HOST = '127.0.0.1' -ETCD_VER = os.getenv('ETCD_VER') or 'v3.3.0' -ETCD_IMG = 'quay.io/coreos/etcd:' + ETCD_VER +CERTS_DIR = os.path.join(os.path.dirname(__file__), 'certs') +CA_PATH = os.path.join(CERTS_DIR, 'ca.pem') +CERT_PATH = os.path.join(CERTS_DIR, 'client.pem') +KEY_PATH = os.path.join(CERTS_DIR, 'client-key.pem') +SERVER_CA_PATH = '/certs/ca.pem' +SERVER_CERT_PATH = '/certs/server.pem' +SERVER_KEY_PATH = '/certs/server-key.pem' + +NO_DOCKER_SERVICE = True +try: # pragma: no cover + import docker # noqa + NO_DOCKER_SERVICE = False +except ImportError as e: # pragma: no cover + print("docker library not found") + print(e) diff --git a/tests/etcd_cluster.py b/tests/etcd_cluster.py new file mode 100644 index 0000000..ce277d8 --- /dev/null +++ b/tests/etcd_cluster.py @@ -0,0 +1,161 @@ +import docker +from etcd3.utils import EtcdEndpoint +from .envs import ETCD_IMG +from .envs import DOCKER_PUBLISH_HOST +from .envs import CERTS_DIR +from .envs import SERVER_CA_PATH +from .envs import SERVER_CERT_PATH +from .envs import SERVER_KEY_PATH +from time import sleep +import copy +import logging +import six + +log = logging.getLogger(__name__) + + +class EtcdTestCluster: + def __init__(self, ident, size, ssl=False): + self.containers = [] + self.network = None + self.ident = ident + self.size = size + self.ssl = ssl + self.client = docker.from_env() + + def etcdctl(self, command, container_idx=None): + if container_idx: + exec_containers = [self.containers[container_idx]] + else: + exec_containers = copy.copy(self.containers) + cmd = '%s %s' % (self.etcdctl_command(), command) + for c in exec_containers: + try: + out = c.exec_run(cmd) + if out.exit_code != 0: + log.warning('executing etcdctl command on %s returned %s' % + (cmd, out)) + continue + return out.output + except Exception as e: + log.warning('error executing etcdctl command %s on %s: %s' % + (cmd, c.name, e)) + raise Exception("error executing etcdctl command %s on all containers" % + cmd) + + def etcdctl_command(self): + command = "etcdctl" + if self.ssl: + command += " --key /certs/client-key.pem" + command += " --cert /certs/client.pem" + command += " --endpoints=https://127.0.0.1:2379" + command += " --insecure-skip-tls-verify" + return command + + def is_container_ready(self, container): + try: + command = '%s endpoint status' % self.etcdctl_command() + out = container.exec_run(command) + if out.exit_code != 0: + return False + return True + except Exception: + sleep(1) + return False + + def wait_container_ready(self, container): + while not self.is_container_ready(container): + sleep(0.5) + idx = self.containers.index(container) + # log.error(self.etcdctl("endpoint status")) + sleep(0.5) + + def wait_ready(self): + while True: + sleep(0.5) + out = self.etcdctl('version') + if six.PY3: # pragma: no cover + out = six.text_type(out, encoding='utf-8') + if 'not_decided' in out: + continue + out = self.etcdctl('member list') + if len([x for x in out.splitlines() if b'started' in x]) == self.size: + return + + def get_endpoints(self): + for c in self.containers: + c.reload() + return [EtcdEndpoint( + c.attrs['NetworkSettings']['Networks']["etcd-%s" % self.ident]['IPAddress'], + 2379) + for c in self.containers] + + def get_endpoints_param(self): + addresses = [ + c.attrs['NetworkSettings']['Networks']["etcd-%s" % self.ident]['IPAddress'] + for c in self.containers] + return "--endpoints=%s" % ",".join(["%s:2379" % a for a in addresses]) + + def down(self): + for c in self.containers: + c.remove(force=True) + if self.network: + self.network.remove() + + def rolling_restart(self): + for c in self.containers: + log.info('killing container %s' % c.name) + c.kill() + log.info('waiting for container %s to be ready' % c.name) + self.wait_container_ready(c) + + def up(self): + self.network = self.client.networks.create(name="etcd-%s" % self.ident) + image_found = False + for image in self.client.images.list(): + if ETCD_IMG in image.tags: + image_found = True + if not image_found: + log.info('pulling image %s' % ETCD_IMG) + self.client.images.pull(ETCD_IMG) + initial_cluster = ','.join( + ["etcd{x}-{n}=http://etcd{x}-{n}:2380".format(n=self.ident, x=x) + for x in range(self.size)]) + if self.ssl: + ssl_opts = [ + '--client-cert-auth', + '--cert-file=%s' % SERVER_CERT_PATH, + '--key-file=%s' % SERVER_KEY_PATH, + '--trusted-ca-file=%s' % SERVER_CA_PATH, + '--listen-client-urls=https://0.0.0.0:2379', + '--advertise-client-urls=https://0.0.0.0:2379'] + else: + ssl_opts = [ + '--listen-client-urls=http://0.0.0.0:2379', + '--advertise-client-urls=http://0.0.0.0:2379'] + self.containers = [ + self.client.containers.create( + name="etcd%s-%s" % (i, self.ident), + image=ETCD_IMG, + environment={ + 'ETCDCTL_API': '3', + }, + command=['etcd', '--name=etcd%s-%s' % (i, self.ident), + '--listen-peer-urls=http://0.0.0.0:2380', + '--initial-cluster', initial_cluster, + ] + ssl_opts, + ports={'2379/tcp': None}, + volumes={ + CERTS_DIR: {'bind': '/certs', 'mode': 'ro'}, + "/tmp/shared": {'bind': '/tmp/shared', 'mode': 'rw'}, + }, + network=self.network.name, + ) + for i in range(self.size)] + for c in self.containers: + log.debug('starting container %s' % c.name) + c.start() + for c in self.containers: + log.debug('wait for container %s to be ready' % c.name) + self.wait_container_ready(c) + c.reload() diff --git a/tests/etcd_go_cli.py b/tests/etcd_go_cli.py deleted file mode 100644 index 36ccaeb..0000000 --- a/tests/etcd_go_cli.py +++ /dev/null @@ -1,40 +0,0 @@ -import shlex - -from etcd3.utils import exec_cmd, find_executable -from tests.docker_cli import NO_DOCKER_SERVICE -from .envs import ETCD_ENDPOINT - -ETCDCTL_PATH = find_executable('etcdctl') - - -def etcdctl(*args, **kwargs): # pragma: no cover - if len(args) == 1: - args = shlex.split(args[0]) - json = kwargs.get('json', False) - endpoint = kwargs.get('endpoint', ETCD_ENDPOINT) - version = kwargs.get('version', 3) - raise_error = kwargs.get('raise_error', True) - - envs = {} - cmd = [ETCDCTL_PATH, '--endpoints', endpoint] - if json: - cmd.extend(['-w', 'json']) - if version == 3: - envs['ETCDCTL_API'] = '3' - cmd.extend(args) - return exec_cmd(cmd, envs, raise_error) - - -NO_ETCD_SERVICE = True -if not NO_DOCKER_SERVICE: - NO_ETCD_SERVICE = False -try: # pragma: no cover - if ETCDCTL_PATH and etcdctl('--dial-timeout=0.2s endpoint health'): - NO_ETCD_SERVICE = False - else: - print("etcdctl executable not found") -except Exception as e: # pragma: no cover - print(e) - -if __name__ == '__main__': - print(etcdctl('get foo')) diff --git a/tests/test_auth_apis.py b/tests/test_auth_apis.py index a6d0b71..c5b0a8b 100644 --- a/tests/test_auth_apis.py +++ b/tests/test_auth_apis.py @@ -3,53 +3,22 @@ import pytest -from etcd3.client import Client from etcd3.errors import ErrAuthNotEnabled from etcd3.errors import ErrRoleNotFound from etcd3.errors import ErrRootUserNotExist from etcd3.errors import ErrUserNotFound from etcd3.models import authpbPermissionType from etcd3.utils import incr_last_byte -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host, ETCD_VER -from .etcd_go_cli import etcdctl, NO_ETCD_SERVICE - - -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -def teardown_auth(client): # pragma: no cover - """ - disable auth, delete all users and roles - """ - etcdctl('--user root:root auth disable', raise_error=False, endpoint=client.baseurl) - etcdctl('--user root:changed auth disable', raise_error=False, endpoint=client.baseurl) - for i in (etcdctl('role list', raise_error=False, endpoint=client.baseurl) or '').splitlines(): - etcdctl('role', 'delete', i, endpoint=client.baseurl) - for i in (etcdctl('user list', raise_error=False, endpoint=client.baseurl) or '').splitlines(): - etcdctl('user', 'delete', i, endpoint=client.baseurl) - - -def enable_auth(): # pragma: no cover - etcdctl('user add root:root') - etcdctl('role add root') - etcdctl('user grant root root') - etcdctl('auth enable') - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +from .envs import ETCD_VER +from .envs import NO_DOCKER_SERVICE +from .conftest import teardown_auth + + +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") # @pytest.mark.skipif(re.match(r'v3\.[0-2]\.{0,1}', ETCD_VER), reason="etcd < v3.3.0 does not support auth header") -def test_auth_flow(client, request): - teardown_auth(client) - request.addfinalizer(lambda: teardown_auth(client)) +def test_auth_flow(client, etcd_cluster, request): + teardown_auth(etcd_cluster) + request.addfinalizer(lambda: teardown_auth(etcd_cluster)) # test error with pytest.raises(ErrRootUserNotExist): diff --git a/tests/test_client.py b/tests/test_client.py index fc4d0c0..d964ba0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,39 +1,23 @@ -import time - import pytest - -from etcd3.client import Client from etcd3.errors import Etcd3Exception -from .docker_cli import CA_PATH, CERT_PATH, KEY_PATH, NO_DOCKER_SERVICE, docker_run_etcd_main -from .docker_cli import docker_run_etcd_ssl, docker_rm_etcd_ssl -from .envs import protocol, host -from .etcd_go_cli import etcdctl, NO_ETCD_SERVICE +from etcd3 import Client +from .envs import CA_PATH, CERT_PATH, KEY_PATH +from .envs import NO_DOCKER_SERVICE from .mocks import fake_request -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_request_and_model(client): - etcdctl('put test_key test_value') +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_request_and_model(client, etcd_cluster): + etcd_cluster.etcdctl('put test_key test_value') result = client.call_rpc('/kv/range', {'key': 'test_key'}) # {"header":{"cluster_id":11588568905070377092,"member_id":128088275939295631,"revision":3,"raft_term":2},"kvs":[{"key":"dGVzdF9rZXk=","create_revision":3,"mod_revision":3,"version":1,"value":"dGVzdF92YWx1ZQ=="}],"count":1}' assert result.kvs[0].key == b'test_key' assert result.kvs[0].value == b'test_value' - etcdctl('del test_key') + etcd_cluster.etcdctl('del test_key') -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_stream(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_stream(client, etcd_cluster): times = 20 created = False with client.call_rpc('/watch', {'create_request': {'key': 'test_key'}}, stream=True) as r: @@ -43,14 +27,14 @@ def test_stream(client): if not created: created = i.created assert created - etcdctl('put test_key test_value') + etcd_cluster.etcdctl('put test_key test_value') if i.events: assert i.events[0].kv.key == b'test_key' assert i.events[0].kv.value == b'test_value' times -= 1 -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_request_exception(client): with pytest.raises(Etcd3Exception, match=r".*'Not Found'.*"): client.call_rpc('/kv/rag', {}) # non exist path @@ -99,10 +83,15 @@ def test_patched_request_exception(client, monkeypatch): @pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") -def test_client_ssl(): - docker_rm_etcd_ssl() - _, port, _ = docker_run_etcd_ssl() - time.sleep(2) - client = Client(host, port, cert=(CERT_PATH, KEY_PATH), verify=CA_PATH) +def test_client_ssl(etcd_cluster_ssl): + client = Client(endpoints=etcd_cluster_ssl.get_endpoints(), + cert=(CERT_PATH, KEY_PATH), verify=False) + assert client.version() + + +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_client_host_port(etcd_cluster_ssl): + endpoint = etcd_cluster_ssl.get_endpoints()[0] + client = Client(host=endpoint.host, port=endpoint.port, + cert=(CERT_PATH, KEY_PATH), verify=False) assert client.version() - docker_rm_etcd_ssl() diff --git a/tests/test_extra_apis.py b/tests/test_extra_apis.py index f40df1c..5d4da43 100644 --- a/tests/test_extra_apis.py +++ b/tests/test_extra_apis.py @@ -1,22 +1,6 @@ -import pytest - -from etcd3.client import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host from .mocks import fake_request -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - def test_version_api(client, monkeypatch): s = b'{"etcdserver":"3.3.0-rc.4","etcdcluster":"3.3.0"}' monkeypatch.setattr(client._session, 'get', fake_request(200, s)) diff --git a/tests/test_kv_apis.py b/tests/test_kv_apis.py index e333339..3e28393 100644 --- a/tests/test_kv_apis.py +++ b/tests/test_kv_apis.py @@ -2,39 +2,20 @@ import pytest import six -from etcd3.client import Client from etcd3.models import RangeRequestSortOrder from etcd3.models import RangeRequestSortTarget -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE -from .etcd_go_cli import etcdctl +from .envs import NO_DOCKER_SERVICE -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -def clear(): - etcdctl('del', '--from-key', '') - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_range(client, request): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_range(client, etcd_cluster, clear, request): clear() request.addfinalizer(clear) # test get - etcdctl('put /foo1 v2') + etcd_cluster.etcdctl('put /foo1 v2') assert client.range('/foo1').kvs[0].value == b'v2' - etcdctl('put /foo2 v1') + etcd_cluster.etcdctl('put /foo2 v1') assert client.range('/foo2').kvs[0].value == b'v1' # test prefix and sort @@ -50,7 +31,7 @@ def test_range(client, request): assert r.kvs[1].key == b'/foo2' # test all - etcdctl('put some_key_else v') + etcd_cluster.etcdctl('put some_key_else v') r = client.range(all=True) assert len(r.kvs) >= 3 @@ -59,8 +40,8 @@ def test_range(client, request): assert len(r.kvs) == 2 -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_put(client, request): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_put(client, clear, request): clear() request.addfinalizer(clear) client.put('foo', 'bar') @@ -69,8 +50,8 @@ def test_put(client, request): assert client.range('foo').kvs[0].value == b'bra' -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_delete(client, request): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_delete(client, clear, request): clear() request.addfinalizer(clear) client.put('foo', 'bar') @@ -80,22 +61,22 @@ def test_delete(client, request): assert not client.range('fo', prefix=True).kvs -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_compact(client, request): - out = etcdctl('put some thing', json=True) +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_compact(client, etcd_cluster, request): + out = etcd_cluster.etcdctl('-w json put some thing') if six.PY3: # pragma: no cover out = six.text_type(out, encoding='utf-8') rev = json.loads(out)['header']['revision'] assert client.compact(rev, physical=False) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_txn(client, request): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_txn(client, etcd_cluster, clear, request): clear() request.addfinalizer(clear) - etcdctl('put flag ok') - etcdctl('put foo bar') - etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put flag ok') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('put fizz buzz') compare = [{ "result": "EQUAL", @@ -118,9 +99,9 @@ def test_txn(client, request): assert not client.range('fizz').kvs assert client.range('foo').kvs[0].value == b'bra' - etcdctl('put flag ok') - etcdctl('put foo bar') - etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put flag ok') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('put fizz buzz') compare = [{ "result": "EQUAL", diff --git a/tests/test_lease_apis.py b/tests/test_lease_apis.py index 43ed222..5a8994e 100644 --- a/tests/test_lease_apis.py +++ b/tests/test_lease_apis.py @@ -1,40 +1,24 @@ import random import time - import pytest - -from etcd3.client import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host, port -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl - - -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() +from .envs import NO_DOCKER_SERVICE -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_hash(client): assert client.hash().hash -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_lease_flow(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_lease_flow(client, etcd_cluster): ID = random.randint(10000, 100000) TTL = 60 r = client.lease_grant(TTL, ID=ID) assert r.ID == ID hexid = hex(ID)[2:] - etcdctl('put --lease=%s foo bar' % hexid) - etcdctl('put --lease=%s fizz buzz' % hexid) + etcd_cluster.etcdctl('put --lease=%s foo bar' % hexid) + etcd_cluster.etcdctl('put --lease=%s fizz buzz' % hexid) time.sleep(1) r = client.lease_time_to_live(ID, keys=True) assert r.ID == ID diff --git a/tests/test_lease_util.py b/tests/test_lease_util.py index 733e0e5..f7f2217 100644 --- a/tests/test_lease_util.py +++ b/tests/test_lease_util.py @@ -4,32 +4,18 @@ import pytest import random -from etcd3.client import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl +from .envs import NO_DOCKER_SERVICE -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_lease_util(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_lease_util(client, etcd_cluster): ID = random.randint(10000, 100000) TTL = 2 # min is 2sec lease = client.Lease(ttl=TTL, ID=ID) with lease: hexid = hex(ID)[2:] - etcdctl('put --lease=%s foo bar' % hexid) - etcdctl('put --lease=%s fizz buzz' % hexid) + etcd_cluster.etcdctl('put --lease=%s foo bar' % hexid) + etcd_cluster.etcdctl('put --lease=%s fizz buzz' % hexid) time.sleep(TTL) r = lease.time_to_live(keys=True) assert r.ID == ID @@ -55,6 +41,7 @@ def test_lease_util(client): lease.grant() lease.keepalive(keep_cb=keep_cb, cancel_cb=cancel_cb) lease.cancel_keepalive() + time.sleep(1) assert keep_cb.called assert cancel_cb.called diff --git a/tests/test_lock_apis.py b/tests/test_lock_apis.py index e6edae6..699272b 100644 --- a/tests/test_lock_apis.py +++ b/tests/test_lock_apis.py @@ -3,21 +3,7 @@ import pytest -from etcd3.client import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl - - -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() +from .envs import NO_DOCKER_SERVICE class context: @@ -25,16 +11,12 @@ def __init__(self): self.exit = False -def clear(): - etcdctl('del', '--from-key', '') - - KEY = 'test-lock' @pytest.mark.timeout(60) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_lock_flow(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_lock_flow(client, clear): clear() holds = {} diff --git a/tests/test_lock_util.py b/tests/test_lock_util.py index 7f44a40..6a82658 100644 --- a/tests/test_lock_util.py +++ b/tests/test_lock_util.py @@ -4,34 +4,19 @@ import pytest -from etcd3 import Client, Lock -from .envs import protocol, host, port -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl +from etcd3 import Lock +from .envs import NO_DOCKER_SERVICE logging.getLogger().setLevel(logging.DEBUG) -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - c = Client(host, port, protocol) - yield c - c.close() - - class context: def __init__(self): self.exit = False -def clear(): - etcdctl('del', '--from-key', '') - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_lock(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_lock(client, clear): clear() holds = {} @@ -89,8 +74,8 @@ def hold(lock, name, ctx): assert l2.holders() == 0 -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_reentrant_lock_host(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_reentrant_lock_host(client, clear): clear() holds = {} @@ -149,8 +134,8 @@ def hold(lock, name, ctx): @pytest.mark.timeout(60) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_reentrant_lock(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_reentrant_lock(client, clear): clear() holds = {} diff --git a/tests/test_maintenance_apis.py b/tests/test_maintenance_apis.py index afd4315..85985db 100644 --- a/tests/test_maintenance_apis.py +++ b/tests/test_maintenance_apis.py @@ -2,58 +2,47 @@ import pytest import six -from etcd3 import Client from etcd3.models import etcdserverpbAlarmType -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl +from .envs import NO_DOCKER_SERVICE -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_hash(client): assert client.hash().hash -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_status(client): assert client.status().version -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_defragment(client): assert client.defragment() -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") def test_alarm(client): assert client.alarm_activate(0, etcdserverpbAlarmType.NOSPACE) assert client.alarm_get(0, etcdserverpbAlarmType.NOSPACE) assert client.alarm_deactivate(0, etcdserverpbAlarmType.NOSPACE) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_snapshot(client): - out = etcdctl('put some thing', json=True) +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_snapshot(client, etcd_cluster): + # write some data into etcd + for i in range(10): + etcd_cluster.etcdctl('put key value%s' % i) + out = etcd_cluster.etcdctl('-w json put some thing') if six.PY3: # pragma: no cover out = six.text_type(out, encoding='utf-8') rev = json.loads(out)['header']['revision'] - etcdctl('compaction --physical %s' % (rev - 10), raise_error=False) - etcdctl('defrag') + etcd_cluster.etcdctl('compaction --physical %s' % (rev - 10)) + etcd_cluster.etcdctl('defrag') r = client.snapshot() - with open('/tmp/etcd-snap.db', 'wb') as f: + with open('/tmp/shared/etcd-snap.db', 'wb') as f: for i in r: assert i.blob f.write(i.blob) - assert etcdctl('snapshot status /tmp/etcd-snap.db') + assert etcd_cluster.etcdctl('snapshot status /tmp/shared/etcd-snap.db') diff --git a/tests/test_py3/conftest.py b/tests/test_py3/conftest.py new file mode 100644 index 0000000..eb01814 --- /dev/null +++ b/tests/test_py3/conftest.py @@ -0,0 +1,22 @@ +import pytest +from etcd3 import AioClient + + +@pytest.fixture +async def aio_client(event_loop, request, etcd_cluster): + """ + init Etcd3Client, close its connection-pool when teardown + """ + + c = AioClient(endpoints=etcd_cluster.get_endpoints(), + protocol='https' if etcd_cluster.ssl else 'http') + + def teardown(): + async def _t(): + await c.close() + + event_loop.run_until_complete(_t()) + event_loop._close() + + request.addfinalizer(teardown) + return c diff --git a/tests/test_py3/test_aio_auth_apis.py b/tests/test_py3/test_aio_auth_apis.py index 6a1406b..b0b3381 100644 --- a/tests/test_py3/test_aio_auth_apis.py +++ b/tests/test_py3/test_aio_auth_apis.py @@ -4,11 +4,10 @@ import pytest import re -from etcd3 import AioClient -from tests.docker_cli import docker_run_etcd_main -from ..envs import protocol, host, ETCD_VER -from ..etcd_go_cli import NO_ETCD_SERVICE -from ..etcd_go_cli import etcdctl +from ..envs import ETCD_VER +from ..envs import NO_DOCKER_SERVICE +from ..conftest import teardown_auth +from ..conftest import enable_auth @pytest.fixture @@ -22,50 +21,12 @@ def event_loop(): # pragma: no cover return res -@pytest.fixture -async def aio_client(event_loop, request): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = AioClient(host, p, protocol) - - def teardown(): - async def _t(): - await c.close() - - event_loop.run_until_complete(_t()) - event_loop._close() - - request.addfinalizer(teardown) - return c - - -def teardown_auth(): # pragma: no cover - """ - disable auth, delete all users and roles - """ - etcdctl('--user root:root auth disable', raise_error=False) - etcdctl('--user root:changed auth disable', raise_error=False) - for i in (etcdctl('role list', raise_error=False) or '').splitlines(): - etcdctl('role', 'delete', i) - for i in (etcdctl('user list', raise_error=False) or '').splitlines(): - etcdctl('user', 'delete', i) - - -def enable_auth(): # pragma: no cover - etcdctl('user add root:root') - etcdctl('role add root') - etcdctl('user grant root root') - etcdctl('auth enable') - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") @pytest.mark.asyncio -async def test_async_client_auth(aio_client, request): - teardown_auth() - enable_auth() - request.addfinalizer(teardown_auth) +async def test_async_client_auth(aio_client, etcd_cluster, request): + teardown_auth(etcd_cluster) + enable_auth(etcd_cluster) + request.addfinalizer(lambda: teardown_auth(etcd_cluster)) # test client auth await aio_client.auth('root', 'root') diff --git a/tests/test_py3/test_aio_client.py b/tests/test_py3/test_aio_client.py index 59c4025..c9a2217 100644 --- a/tests/test_py3/test_aio_client.py +++ b/tests/test_py3/test_aio_client.py @@ -1,14 +1,10 @@ import asyncio -import time - import pytest - from etcd3 import AioClient -from ..docker_cli import docker_rm_etcd_ssl, docker_run_etcd_ssl, CERT_PATH, KEY_PATH, CA_PATH, NO_DOCKER_SERVICE, \ - docker_run_etcd_main -from ..envs import protocol, host -from ..etcd_go_cli import NO_ETCD_SERVICE -from ..etcd_go_cli import etcdctl +from ..envs import CERT_PATH +from ..envs import KEY_PATH +from ..envs import CA_PATH +from ..envs import NO_DOCKER_SERVICE @pytest.fixture @@ -22,40 +18,20 @@ def event_loop(): # pragma: no cover return res -@pytest.fixture -async def aio_client(event_loop, request): - """ - init Etcd3Client, close its connection-pool when teardown - """ - - _, p, _ = docker_run_etcd_main() - c = AioClient(host, p, protocol) - - def teardown(): - async def _t(): - await c.close() - - event_loop.run_until_complete(_t()) - event_loop._close() - - request.addfinalizer(teardown) - return c - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") @pytest.mark.asyncio -async def test_async_request_and_model(aio_client): - etcdctl('put test_key test_value') +async def test_async_request_and_model(aio_client, etcd_cluster): + etcd_cluster.etcdctl('put test_key test_value') result = await aio_client.call_rpc('/kv/range', {'key': 'test_key'}) # {"header":{"cluster_id":11588568905070377092,"member_id":128088275939295631,"revision":3,"raft_term":2},"kvs":[{"key":"dGVzdF9rZXk=","create_revision":3,"mod_revision":3,"version":1,"value":"dGVzdF92YWx1ZQ=="}],"count":1}' assert result.kvs[0].key == b'test_key' assert result.kvs[0].value == b'test_value' - etcdctl('del test_key') + etcd_cluster.etcdctl('del test_key') -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") @pytest.mark.asyncio -async def test_async_stream(aio_client): +async def test_async_stream(aio_client, etcd_cluster): times = 20 created = False # async with and with both works @@ -66,7 +42,7 @@ async def test_async_stream(aio_client): if not created: created = i.created assert created - etcdctl('put test_key test_value') + etcd_cluster.etcdctl('put test_key test_value') if i.events: assert i.events[0].kv.key == b'test_key' assert i.events[0].kv.value == b'test_value' @@ -75,10 +51,16 @@ async def test_async_stream(aio_client): @pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") @pytest.mark.asyncio -async def test_aio_client_ssl(): - docker_rm_etcd_ssl() - _, port, _ = docker_run_etcd_ssl() - time.sleep(2) - aio_client = AioClient(host, port, cert=(CERT_PATH, KEY_PATH), verify=CA_PATH) +async def test_aio_client_ssl(etcd_cluster_ssl): + aio_client = AioClient(endpoints=etcd_cluster_ssl.get_endpoints(), + cert=(CERT_PATH, KEY_PATH), verify=False) + assert await aio_client.call_rpc('/kv/range', {'key': 'test_key'}) + + +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +@pytest.mark.asyncio +async def test_aio_client_host_port(etcd_cluster_ssl): + endpoint = etcd_cluster_ssl.get_endpoints()[0] + aio_client = AioClient(host=endpoint.host, port=endpoint.port, + cert=(CERT_PATH, KEY_PATH), verify=False) assert await aio_client.call_rpc('/kv/range', {'key': 'test_key'}) - docker_rm_etcd_ssl() diff --git a/tests/test_transaction_util.py b/tests/test_transaction_util.py index 154f8c2..9da6c42 100644 --- a/tests/test_transaction_util.py +++ b/tests/test_transaction_util.py @@ -1,26 +1,12 @@ import pytest -from etcd3 import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import etcdctl, NO_ETCD_SERVICE - - -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() +from .envs import NO_DOCKER_SERVICE @pytest.mark.timeout(60) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_transaction(client): - etcdctl('put foo bar') +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_transaction(client, etcd_cluster): + etcd_cluster.etcdctl('put foo bar') txn = client.Txn() txn.compare(txn.key('foo').value == 'bar') txn.success(txn.put('foo', 'bra')) @@ -35,7 +21,7 @@ def test_transaction(client): txn.commit() assert client.range('foo').kvs[0].value == b'bar' - etcdctl('put foo 2') + etcd_cluster.etcdctl('put foo 2') txn = client.Txn() txn.If(txn.key('foo').value > b'1') txn.If(txn.key('foo').value < b'3') @@ -45,8 +31,8 @@ def test_transaction(client): assert r.succeeded assert client.range('foo').kvs[0].value == b'bra' - etcdctl('put foo bar') - etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('put fizz buzz') txn = client.Txn() txn.success(txn.range('foo')) txn.success(txn.delete('fizz')) diff --git a/tests/test_watch_apis.py b/tests/test_watch_apis.py index 0f086ec..4baeb70 100644 --- a/tests/test_watch_apis.py +++ b/tests/test_watch_apis.py @@ -1,29 +1,10 @@ import pytest -from etcd3.client import Client -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE -from .etcd_go_cli import etcdctl +from .envs import NO_DOCKER_SERVICE -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() - - -def clear(): - etcdctl('del', '--from-key', '') - - -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_watch_api(client, request): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_watch_api(client, clear, etcd_cluster, request): clear() request.addfinalizer(clear) @@ -36,7 +17,7 @@ def test_watch_api(client, request): if not created: created = i.created assert created - etcdctl('put test_key test_value') + etcd_cluster.etcdctl('put test_key test_value') if i.events: assert i.events[0].kv.key == b'test_key' assert i.events[0].kv.value == b'test_value' @@ -51,7 +32,7 @@ def test_watch_api(client, request): if not created: created = i.created assert created - etcdctl('put test_key test_value') + etcd_cluster.etcdctl('put test_key test_value') if i.events: assert i.events[0].kv.key == b'test_key' assert i.events[0].kv.value == b'test_value' @@ -66,7 +47,7 @@ def test_watch_api(client, request): if not created: created = i.created assert created - etcdctl('put test_key test_value') + etcd_cluster.etcdctl('put test_key test_value') if i.events: assert i.events[0].kv.key == b'test_key' assert i.events[0].kv.value == b'test_value' diff --git a/tests/test_watch_util.py b/tests/test_watch_util.py index f811f24..4ad4f23 100644 --- a/tests/test_watch_util.py +++ b/tests/test_watch_util.py @@ -3,26 +3,13 @@ import pytest import socket -from etcd3 import Client, EventType -from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host -from .etcd_go_cli import etcdctl, NO_ETCD_SERVICE - - -@pytest.fixture(scope='module') -def client(): - """ - init Etcd3Client, close its connection-pool when teardown - """ - _, p, _ = docker_run_etcd_main() - c = Client(host, p, protocol) - yield c - c.close() +from etcd3 import EventType +from .envs import NO_DOCKER_SERVICE @pytest.mark.timeout(60) -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") -def test_watcher(client): +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_watcher(client, etcd_cluster): max_retries = 3 w = client.Watcher(all=True, progress_notify=True, prev_kv=True, max_retries=max_retries) @@ -51,12 +38,12 @@ def test_watcher(client): assert w.watching assert w._thread.is_alive - etcdctl('put foo bar') - etcdctl('put foo bar') - etcdctl('del foo') - etcdctl('put fizz buzz') - etcdctl('put fizz buzz') - etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('del foo') + etcd_cluster.etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put fizz buzz') time.sleep(1) w.stop() @@ -70,8 +57,8 @@ def test_watcher(client): assert len(put_list) == 5 assert len(all_list) == 6 - etcdctl('put foo bar') - etcdctl('put fizz buzz') + etcd_cluster.etcdctl('put foo bar') + etcd_cluster.etcdctl('put fizz buzz') foo_list = [] fizz_list = [] @@ -89,13 +76,13 @@ def test_watcher(client): times = 3 with w: - etcdctl('put foo bar') + etcd_cluster.etcdctl('put foo bar') for e in w: if not times: break assert e.key == b'foo' assert e.value == b'bar' - etcdctl('put foo bar') + etcd_cluster.etcdctl('put foo bar') times -= 1 assert not w.watching assert w._resp.raw.closed @@ -120,3 +107,45 @@ def test_watcher(client): assert not w.watching assert w._resp.raw.closed assert len(w.errors) == max_retries + + +@pytest.mark.timeout(60) +@pytest.mark.skipif(NO_DOCKER_SERVICE, reason="no docker service available") +def test_watcher_failover(client, etcd_cluster): + w = client.Watcher(all=True, progress_notify=True, prev_kv=True) + client.timeout = 2 + w.set_default_timeout(2) + put_list = [] + w.onEvent(EventType.PUT, lambda e: put_list.append(e)) + + assert len(w.callbacks) == 1 + + w.runDaemon() + time.sleep(0.2) + live = w._thread.is_alive() + + with pytest.raises(RuntimeError): + w.runDaemon() + with pytest.raises(RuntimeError): + w.run() + + assert w.watching + assert w._thread.is_alive + assert client.current_endpoint == client.endpoints[0] + + # hard rolling restart with write and calls to status after killing each node + for i in range(3): + c = etcd_cluster.containers[i%3] + c.kill() + etcd_cluster.etcdctl("put key%s val" % i) + while not len(put_list) == i+1: + time.sleep(0.5) + client.status() + c.restart() + assert w.watching + etcd_cluster.wait_ready() + w.stop() + + assert w._resp.raw.closed + # ensure all events have been reported + assert len(put_list) == 3