diff --git a/ecs_deploy/cli.py b/ecs_deploy/cli.py index 679fe93..4e183c1 100644 --- a/ecs_deploy/cli.py +++ b/ecs_deploy/cli.py @@ -6,7 +6,6 @@ import click import json import getpass -from datetime import datetime, timedelta from botocore.exceptions import ClientError from ecs_deploy import VERSION from ecs_deploy.ecs import DeployAction, ScaleAction, RunAction, EcsClient, DiffAction, \ @@ -26,8 +25,7 @@ def get_client(access_key_id, secret_access_key, region, profile, assume_account @click.command() -@click.argument('cluster') -@click.argument('service') +@click.argument('cluster-service', nargs=-1) @click.option('-t', '--tag', help='Changes the tag for ALL container images') @click.option('-i', '--image', type=(str, str), multiple=True, help='Overwrites the image for a container: ') @click.option('-c', '--command', type=(str, str), multiple=True, help='Overwrites the command in a container: ') @@ -85,11 +83,12 @@ def get_client(access_key_id, secret_access_key, region, profile, assume_account @click.option('--volume', type=(str, str), multiple=True, required=False, help='Set volume mapping from host to container in the task definition.') @click.option('--add-container', type=str, multiple=True, required=False, help='Add a placeholder container in the task definition.') @click.option('--remove-container', type=str, multiple=True, required=False, help='Remove a container from the task definition.') -def deploy(cluster, service, tag, image, command, health_check, cpu, memory, memoryreservation, task_cpu, task_memory, privileged, essential, env, env_file, s3_env_file, secret, secrets_env_file, ulimit, system_control, port, mount, log, role, execution_role, runtime_platform, task, region, access_key_id, secret_access_key, profile, account, assume_role, timeout, newrelic_apikey, newrelic_appid, newrelic_region, newrelic_revision, comment, user, ignore_warnings, diff, deregister, rollback, exclusive_env, exclusive_secrets, exclusive_s3_env_file, sleep_time, exclusive_ulimits, exclusive_system_controls, exclusive_ports, exclusive_mounts, volume, add_container, remove_container, slack_url, docker_label, exclusive_docker_labels, slack_service_match='.*'): +def deploy(cluster_service, tag, image, command, health_check, cpu, memory, memoryreservation, task_cpu, task_memory, privileged, essential, env, env_file, s3_env_file, secret, secrets_env_file, ulimit, system_control, port, mount, log, role, execution_role, runtime_platform, task, region, access_key_id, secret_access_key, profile, account, assume_role, timeout, newrelic_apikey, newrelic_appid, newrelic_region, newrelic_revision, comment, user, ignore_warnings, diff, deregister, rollback, exclusive_env, exclusive_secrets, exclusive_s3_env_file, sleep_time, exclusive_ulimits, exclusive_system_controls, exclusive_ports, exclusive_mounts, volume, add_container, remove_container, slack_url, docker_label, exclusive_docker_labels, slack_service_match='.*'): """ Redeploy or modify a service. \b + CLUSTER_SERVICE is the space separated pair of your CLUSTER and SERVICE. Multiple pairs can be given. CLUSTER is the name of your cluster (e.g. 'my-cluster') within ECS. SERVICE is the name of your service (e.g. 'my-app') within ECS. @@ -99,59 +98,65 @@ def deploy(cluster, service, tag, image, command, health_check, cpu, memory, mem """ try: client = get_client(access_key_id, secret_access_key, region, profile, account, assume_role) - deployment = DeployAction(client, cluster, service) - td = get_task_definition(deployment, task) - # If there is a new container, add it at frist. - td.add_containers(add_container) - td.remove_containers(remove_container) - td.set_images(tag, **{key: value for (key, value) in image}) - td.set_commands(**{key: value for (key, value) in command}) - td.set_health_checks(health_check) - td.set_cpu(**{key: value for (key, value) in cpu}) - td.set_memory(**{key: value for (key, value) in memory}) - td.set_memoryreservation(**{key: value for (key, value) in memoryreservation}) - td.set_task_cpu(task_cpu) - td.set_task_memory(task_memory) - td.set_privileged(**{key: value for (key, value) in privileged}) - td.set_essential(**{key: value for (key, value) in essential}) - td.set_environment(env, exclusive_env, env_file) - td.set_docker_labels(docker_label, exclusive_docker_labels) - td.set_s3_env_file(s3_env_file, exclusive_s3_env_file) - td.set_secrets(secret, exclusive_secrets, secrets_env_file) - td.set_ulimits(ulimit, exclusive_ulimits) - td.set_system_controls(system_control, exclusive_system_controls) - td.set_port_mappings(port, exclusive_ports) - td.set_mount_points(mount, exclusive_mounts) - td.set_log_configurations(log) - td.set_role_arn(role) - td.set_execution_role_arn(execution_role) - td.set_runtime_platform(runtime_platform) - td.set_volumes(volume) + deployments = [] + num_pairs = len(cluster_service) // 2 + for i in range(num_pairs): + cluster, service = cluster_service[i * 2], cluster_service[i * 2 + 1] + + deployment = DeployAction(client, cluster, service) + + td = deployment.get_task_definition(task) + # If there is a new container, add it at frist. + td.add_containers(add_container) + td.remove_containers(remove_container) + td.set_images(tag, **{key: value for (key, value) in image}) + td.set_commands(**{key: value for (key, value) in command}) + td.set_health_checks(health_check) + td.set_cpu(**{key: value for (key, value) in cpu}) + td.set_memory(**{key: value for (key, value) in memory}) + td.set_memoryreservation(**{key: value for (key, value) in memoryreservation}) + td.set_task_cpu(task_cpu) + td.set_task_memory(task_memory) + td.set_privileged(**{key: value for (key, value) in privileged}) + td.set_essential(**{key: value for (key, value) in essential}) + td.set_environment(env, exclusive_env, env_file) + td.set_docker_labels(docker_label, exclusive_docker_labels) + td.set_s3_env_file(s3_env_file, exclusive_s3_env_file) + td.set_secrets(secret, exclusive_secrets, secrets_env_file) + td.set_ulimits(ulimit, exclusive_ulimits) + td.set_system_controls(system_control, exclusive_system_controls) + td.set_port_mappings(port, exclusive_ports) + td.set_mount_points(mount, exclusive_mounts) + td.set_log_configurations(log) + td.set_role_arn(role) + td.set_execution_role_arn(execution_role) + td.set_runtime_platform(runtime_platform) + td.set_volumes(volume) + + slack = SlackNotification( + getenv('SLACK_URL', slack_url), + getenv('SLACK_SERVICE_MATCH', slack_service_match) + ) + slack.notify_start(cluster, tag, td, comment, user, service=service) - slack = SlackNotification( - getenv('SLACK_URL', slack_url), - getenv('SLACK_SERVICE_MATCH', slack_service_match) - ) - slack.notify_start(cluster, tag, td, comment, user, service=service) + click.secho('Deploying based on task definition: %s\n' % td.family_revision) - click.secho('Deploying based on task definition: %s\n' % td.family_revision) + if diff: + print_diff(td) - if diff: - print_diff(td) + new_td = create_task_definition(deployment, td) - new_td = create_task_definition(deployment, td) + deployments.append((deployment, td, new_td)) try: - deploy_task_definition( - deployment=deployment, - task_definition=new_td, + deploy_task_definitions( + deployments=deployments, title='Deploying new task definition', success_message='Deployment successful', failure_message='Deployment failed', timeout=timeout, deregister=deregister, - previous_task_definition=td, ignore_warnings=ignore_warnings, sleep_time=sleep_time ) @@ -160,7 +165,7 @@ def deploy(cluster, service, tag, image, command, health_check, cpu, memory, mem slack.notify_failure(cluster, str(e), service=service) if rollback: click.secho('%s\n' % str(e), fg='red', err=True) - rollback_task_definition(deployment, td, new_td, sleep_time=sleep_time) + rollback_task_definitions(deployments, sleep_time=sleep_time) exit(1) else: raise @@ -387,7 +392,7 @@ def scale(cluster, service, desired_count, access_key_id, secret_access_key, reg fg='green' ) wait_for_finish( - action=scaling, + action=[scaling], timeout=timeout, title='Scaling service', success_message='Scaling successful', @@ -518,61 +523,44 @@ def diff(task, revision_a, revision_b, region, access_key_id, secret_access_key, exit(1) -def wait_for_finish(action, timeout, title, success_message, failure_message, +def wait_for_finish(actions, timeout, title, success_message, failure_message, ignore_warnings, sleep_time=1): click.secho(title) - start_timestamp = datetime.now() - waiting_timeout = datetime.now() + timedelta(seconds=timeout) - service = action.get_service() - inspected_until = None if timeout == -1: - waiting = False - else: - waiting = True + click.secho("Timeout disabled. Fire and forget.") + return - while waiting and datetime.now() < waiting_timeout: - click.secho('.', nl=False) - service = action.get_service() - inspected_until = inspect_errors( - service=service, - failure_message=failure_message, - ignore_warnings=ignore_warnings, - since=inspected_until, - timeout=False - ) - waiting = not action.is_deployed(service) - - if waiting: - sleep(sleep_time) + while actions: + # Traverse in reverse order to remove finished actions + for i, action in list(enumerate(actions))[::-1]: + if action.has_finished(timeout, failure_message, ignore_warnings): + click.secho('\n%s (%s)' % (success_message, action.service_name), fg='green') + click.secho('Duration: %s sec\n' % action.get_duration()) - inspect_errors( - service=service, - failure_message=failure_message, - ignore_warnings=ignore_warnings, - since=inspected_until, - timeout=waiting - ) + actions.pop(i) - click.secho('\n%s' % success_message, fg='green') - click.secho('Duration: %s sec\n' % (datetime.now() - start_timestamp).seconds) + click.secho('.', nl=False) + sleep(sleep_time) -def deploy_task_definition(deployment, task_definition, title, success_message, +def deploy_task_definitions(deployments, title, success_message, failure_message, timeout, deregister, - previous_task_definition, ignore_warnings, sleep_time): - click.secho('Updating service') - deployment.deploy(task_definition) + ignore_warnings, sleep_time): - message = 'Successfully changed task definition to: %s:%s\n' % ( - task_definition.family, - task_definition.revision - ) + for deployment, _, new_td in deployments: + click.secho('Updating %s' % deployment.service_name) + deployment.deploy(new_td) - click.secho(message, fg='green') + message = 'Successfully changed task definition to: %s:%s\n' % ( + new_td.family, + new_td.revision + ) + + click.secho(message, fg='green') wait_for_finish( - action=deployment, + actions=[action for action, _, _ in deployments], timeout=timeout, title=title, success_message=success_message, @@ -582,15 +570,8 @@ def deploy_task_definition(deployment, task_definition, title, success_message, ) if deregister: - deregister_task_definition(deployment, previous_task_definition) - - -def get_task_definition(action, task): - if task: - task_definition = action.get_task_definition(task) - else: - task_definition = action.get_current_task_definition(action.service) - return task_definition + for deployment, old_td, _ in deployments: + deregister_task_definition(deployment, old_td) def create_task_definition(action, task_definition): @@ -614,26 +595,25 @@ def deregister_task_definition(action, task_definition): ) -def rollback_task_definition(deployment, old, new, timeout=600, sleep_time=1): +def rollback_task_definitions(deployments, timeout=600, sleep_time=1): + task_definitions = ", ".join(old_td.family_revision for _, old_td, _ in deployments) click.secho( - 'Rolling back to task definition: %s\n' % old.family_revision, + 'Rolling back to task definitions: %s\n' % task_definitions, fg='yellow', ) - deploy_task_definition( - deployment=deployment, - task_definition=old, + deploy_task_definitions( + deployments=deployments, title='Deploying previous task definition', success_message='Rollback successful', failure_message='Rollback failed. Please check ECS Console', timeout=timeout, deregister=True, - previous_task_definition=new, ignore_warnings=False, sleep_time=sleep_time ) click.secho( - 'Deployment failed, but service has been rolled back to previous ' - 'task definition: %s\n' % old.family_revision, fg='yellow', err=True + 'Deployment failed, but services have been rolled back to previous ' + 'task definitions: %s\n' % task_definitions, fg='yellow', err=True ) @@ -669,51 +649,6 @@ def print_diff(task_definition, title='Updating task definition'): click.secho('') -def inspect_errors(service, failure_message, ignore_warnings, since, timeout): - error = False - last_error_timestamp = since - warnings = service.get_warnings(since) - for timestamp in warnings: - message = warnings[timestamp] - click.secho('') - if ignore_warnings: - last_error_timestamp = timestamp - click.secho( - '%s\nWARNING: %s' % (timestamp, message), - fg='yellow', - err=False - ) - click.secho('Continuing.', nl=False) - else: - click.secho( - '%s\nERROR: %s\n' % (timestamp, message), - fg='red', - err=True - ) - error = True - - if service.older_errors: - click.secho('') - click.secho('Older errors', fg='yellow', err=True) - for timestamp in service.older_errors: - click.secho( - text='%s\n%s\n' % (timestamp, service.older_errors[timestamp]), - fg='yellow', - err=True - ) - - if timeout: - error = True - failure_message += ' due to timeout. Please see: ' \ - 'https://github.com/fabfuel/ecs-deploy#timeout' - click.secho('') - - if error: - raise TaskPlacementError(failure_message) - - return last_error_timestamp - - ecs.add_command(deploy) ecs.add_command(scale) ecs.add_command(run) diff --git a/ecs_deploy/ecs.py b/ecs_deploy/ecs.py index 2fb4824..833f092 100644 --- a/ecs_deploy/ecs.py +++ b/ecs_deploy/ecs.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta import json import re import copy @@ -1331,10 +1331,11 @@ def get_service(self): service_definition=services_definition[u'services'][0] ) - def get_current_task_definition(self, service): - return self.get_task_definition(service.task_definition) def get_task_definition(self, task_definition): + if not task_definition: + task_definition = self._service.task_definition + task_definition_payload = self._client.describe_task_definition( task_definition_arn=task_definition ) @@ -1424,18 +1425,78 @@ def service_name(self): return self._service_name -class DeployAction(EcsAction): +class TrackableProgress: + def init_progresstracker(self): + self.start_timestamp = datetime.now() + self.stop_timestamp = None + self.inspected_until = None + + def has_finished(self, timeout, failure_message, ignore_warnings): + waiting_timeout = self.start_timestamp + timedelta(seconds=timeout) + if datetime.now() > waiting_timeout: + raise TaskPlacementError(failure_message + " due to timeout. Please see: https://github.com/fabfuel/ecs-deploy#timeout") + + service = self.get_service() + self.inspect_warnings(service, failure_message, ignore_warnings) + + has_finished = self.is_deployed(service) + + if has_finished: + self.stop_timestamp = datetime.now() + return has_finished + + def inspect_warnings(self, service, failure_message, ignore_warnings): + + warnings = service.get_warnings(self.inspected_until) + if not warnings: + return + + if ignore_warnings: + logger.warning('') + else: + logger.error('') + + error = False + + for timestamp in warnings: + message = warnings[timestamp] + if ignore_warnings: + self.inspected_until = timestamp + logger.warning('%s: %s' % (timestamp, message)) + logger.warning('Continuing.') + else: + error = True + logger.error('%s: %s' % (timestamp, message)) + + if service.older_errors: + logger.error('') + logger.error('Older errors') + for timestamp in service.older_errors: + logger.error('%s: %s' % (timestamp, service.older_errors[timestamp])) + + if error: + raise TaskPlacementError(failure_message) + + def get_duration(self): + if not self.stop_timestamp: + raise ValueError("Action not finished yet.") + return (self.stop_timestamp - self.start_timestamp).seconds + + +class DeployAction(TrackableProgress, EcsAction): def deploy(self, task_definition): try: self._service.set_task_definition(task_definition) + self.init_progresstracker() return self.update_service(self._service) except ClientError as e: raise EcsError(str(e)) -class ScaleAction(EcsAction): +class ScaleAction(TrackableProgress, EcsAction): def scale(self, desired_count): try: + self.init_progresstracker() return self.update_service(self._service, desired_count) except ClientError as e: raise EcsError(str(e)) diff --git a/tests/test_cli.py b/tests/test_cli.py index 96bff7b..5dc15a9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,4 @@ +import re from datetime import datetime import pytest @@ -244,7 +245,7 @@ def test_deploy_one_new_health_check(get_client, cmd_input, cmd_expected, runner assert u'Successfully deregistered revision: 1' in result.output assert u'Successfully changed task definition to: test-task:2' in result.output assert u'Deployment successful' in result.output - + @patch('ecs_deploy.cli.get_client') @@ -635,8 +636,7 @@ def test_deploy_with_errors(get_client, runner): result = runner.invoke(cli.deploy, (CLUSTER_NAME, SERVICE_NAME)) assert result.exit_code == 1 assert u"Deployment failed" in result.output - assert u"ERROR: Service was unable to Lorem Ipsum" in result.output - + assert re.search("error: [0-9-: \\.+]+Service was unable to Lorem Ipsum", result.output) @patch('ecs_deploy.cli.get_client') def test_deploy_with_client_errors(get_client, runner): @@ -657,7 +657,7 @@ def test_deploy_ignore_warnings(get_client, runner): assert u'Successfully created revision: 2' in result.output assert u'Successfully deregistered revision: 1' in result.output assert u'Successfully changed task definition to: test-task:2' in result.output - assert u"WARNING: Service was unable to Lorem Ipsum" in result.output + assert re.search("warning: [0-9-: \\.+]+Service was unable to Lorem Ipsum", result.output) assert u"Continuing." in result.output assert u'Deployment successful' in result.output @@ -781,7 +781,7 @@ def test_deploy_with_wait_within_timeout(get_client, runner): result = runner.invoke(cli.deploy, (CLUSTER_NAME, SERVICE_NAME, '--timeout', '10')) assert result.exit_code == 0 assert u'Deploying new task definition' in result.output - assert u'...' in result.output + assert u'..' in result.output @patch('ecs_deploy.cli.get_client') @@ -849,7 +849,7 @@ def test_scale_with_errors(get_client, runner): result = runner.invoke(cli.scale, (CLUSTER_NAME, SERVICE_NAME, '2')) assert result.exit_code == 1 assert u"Scaling failed" in result.output - assert u"ERROR: Service was unable to Lorem Ipsum" in result.output + assert re.search("error: [0-9-: \\.+]+Service was unable to Lorem Ipsum", result.output) @patch('ecs_deploy.cli.get_client') @@ -868,7 +868,7 @@ def test_scale_ignore_warnings(get_client, runner): assert not result.exception assert result.exit_code == 0 assert u"Successfully changed desired count to: 2" in result.output - assert u"WARNING: Service was unable to Lorem Ipsum" in result.output + assert re.search("warning: [0-9-: \\.+]+Service was unable to Lorem Ipsum", result.output) assert u"Continuing." in result.output assert u"Scaling successful" in result.output diff --git a/tests/test_ecs.py b/tests/test_ecs.py index ac75650..01636a5 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -1466,15 +1466,13 @@ def test_ecs_action_get_service(): @patch.object(EcsClient, '__init__') -def test_ecs_action_get_current_task_definition(client, service): +def test_ecs_action_get_task_definition(client, service): client.describe_task_definition.return_value = RESPONSE_TASK_DEFINITION action = EcsAction(client, u'test-cluster', u'test-service') - task_definition = action.get_current_task_definition(service) + task_definition = action.get_task_definition(None) - client.describe_task_definition.assert_called_once_with( - task_definition_arn=service.task_definition - ) + client.describe_task_definition.assert_called_once() assert isinstance(task_definition, EcsTaskDefinition) assert task_definition.family == u'test-task'