From dccb91774f828fac4a5a7dd3950a3fbc7a3ba56b Mon Sep 17 00:00:00 2001 From: aandrieiev Date: Mon, 3 Aug 2015 18:33:30 +0200 Subject: [PATCH 1/3] Add build serialization --- config/dev.exs | 5 +- config/prod.exs | 21 +- config/test.exs | 27 +- lib/workflow_orchestrator/configuration.ex | 47 +- lib/workflow_orchestrator/workflow.ex | 142 +++--- lib/workflow_orchestrator/workflow_fsm.ex | 123 ++++-- test/stubs/workflow_api_stub.exs | 58 +++ .../workflow_fsm_test.exs | 412 +++++++++++++----- 8 files changed, 572 insertions(+), 263 deletions(-) create mode 100644 test/stubs/workflow_api_stub.exs diff --git a/config/dev.exs b/config/dev.exs index 8e033a3..8113f2e 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -16,4 +16,7 @@ use Mix.Config # metadata: [:user_id] config :autostart, - register_queues: true \ No newline at end of file + register_queues: true + +config :openaperture_workflow_orchestrator, + queued_builds_check_delay: "15000" diff --git a/config/prod.exs b/config/prod.exs index ada1582..46b0046 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -16,15 +16,18 @@ use Mix.Config # metadata: [:user_id] config :autostart, - register_queues: true + register_queues: true -config :openaperture_manager_api, - manager_url: System.get_env("MANAGER_URL"), - oauth_login_url: System.get_env("OAUTH_LOGIN_URL"), - oauth_client_id: System.get_env("OAUTH_CLIENT_ID"), - oauth_client_secret: System.get_env("OAUTH_CLIENT_SECRET") +config :openaperture_manager_api, + manager_url: System.get_env("MANAGER_URL"), + oauth_login_url: System.get_env("OAUTH_LOGIN_URL"), + oauth_client_id: System.get_env("OAUTH_CLIENT_ID"), + oauth_client_secret: System.get_env("OAUTH_CLIENT_SECRET") config :openaperture_overseer_api, - module_type: :workflow_orchestrator, - exchange_id: System.get_env("EXCHANGE_ID"), - broker_id: System.get_env("BROKER_ID") \ No newline at end of file + module_type: :workflow_orchestrator, + exchange_id: System.get_env("EXCHANGE_ID"), + broker_id: System.get_env("BROKER_ID") + +config :openaperture_workflow_orchestrator, + queued_builds_check_delay: "15000" # ms diff --git a/config/test.exs b/config/test.exs index af84eda..b4636dc 100644 --- a/config/test.exs +++ b/config/test.exs @@ -16,20 +16,21 @@ use Mix.Config # metadata: [:user_id] config :autostart, - register_queues: false + register_queues: false -config :openaperture_manager_api, - manager_url: "https://openaperture-mgr.host.co", - oauth_login_url: "https://auth.host.co", - oauth_client_id: "id", - oauth_client_secret: "secret" +config :openaperture_manager_api, + manager_url: "https://openaperture-mgr.host.co", + oauth_login_url: "https://auth.host.co", + oauth_client_id: "id", + oauth_client_secret: "secret" -config :openaperture_workflow_orchestrator, - exchange_id: "1", - broker_id: "1" +config :openaperture_workflow_orchestrator, + exchange_id: "1", + broker_id: "1", + queued_builds_check_delay: "3" config :openaperture_overseer_api, - module_type: :test, - autostart: false, - exchange_id: "1", - broker_id: "1" \ No newline at end of file + module_type: :test, + autostart: false, + exchange_id: "1", + broker_id: "1" diff --git a/lib/workflow_orchestrator/configuration.ex b/lib/workflow_orchestrator/configuration.ex index 5e5814a..2b8f141 100644 --- a/lib/workflow_orchestrator/configuration.ex +++ b/lib/workflow_orchestrator/configuration.ex @@ -7,13 +7,13 @@ defmodule OpenAperture.WorkflowOrchestrator.Configuration do @doc """ Method to retrieve the currently assigned exchange id - + ## Options - + ## Return values The exchange identifier - """ + """ @spec get_current_exchange_id() :: String.t() def get_current_exchange_id do get_config("EXCHANGE_ID", :openaperture_workflow_orchestrator, :exchange_id) @@ -21,13 +21,13 @@ defmodule OpenAperture.WorkflowOrchestrator.Configuration do @doc """ Method to retrieve the currently assigned exchange id - + ## Options - + ## Return values The exchange identifier - """ + """ @spec get_current_broker_id() :: String.t() def get_current_broker_id do get_config("BROKER_ID", :openaperture_workflow_orchestrator, :broker_id) @@ -35,13 +35,13 @@ defmodule OpenAperture.WorkflowOrchestrator.Configuration do @doc """ Method to retrieve the currently assigned queue name (for "workflow_orchestration") - + ## Options - + ## Return values The exchange identifier - """ + """ @spec get_current_queue_name() :: String.t() def get_current_queue_name do get_config("QUEUE_NAME", :openaperture_overseer, :queue_name) @@ -49,35 +49,44 @@ defmodule OpenAperture.WorkflowOrchestrator.Configuration do @doc """ Method to retrieve the associated UI's url - + ## Options - + ## Return values The exchange identifier - """ + """ @spec get_ui_url() :: String.t() def get_ui_url do get_config("UI_URL", :openaperture_overseer, :ui_url) end + def get_queue_build_delay do + {delay, _} = + "QUEUED_BUILDS_CHECK_DELAY" + |> get_config(:openaperture_workflow_orchestrator, :queued_builds_check_delay) + |> Integer.parse + + delay + end + @doc false # Method to retrieve a configuration option from the environment or config settings - # + # ## Options - # + # # The `env_name` option defines the environment variable name # # The `application_config` option defines the config application name (atom) # # The `config_name` option defines the config variable name (atom) - # + # ## Return values - # + # # Value - # + # @spec get_config(String.t(), term, term) :: String.t() defp get_config(env_name, application_config, config_name) do System.get_env(env_name) || Application.get_env(application_config, config_name) - end -end \ No newline at end of file + end +end diff --git a/lib/workflow_orchestrator/workflow.ex b/lib/workflow_orchestrator/workflow.ex index 2a32fd9..ecee3c5 100644 --- a/lib/workflow_orchestrator/workflow.ex +++ b/lib/workflow_orchestrator/workflow.ex @@ -11,7 +11,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do @moduledoc """ This module contains the for interacting with a Workflow - """ + """ alias OpenAperture.WorkflowOrchestrator.Notifications.Publisher, as: NotificationsPublisher @@ -31,7 +31,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to create a Workflow ## Options - + The `payload` option defines the Workflow info that should be stored and referenced ## Return values @@ -52,7 +52,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do if workflow_info[:workflow_start_time] == nil do workflow_info = Map.put(workflow_info, :workflow_start_time, Time.now()) end - + if workflow_info[:workflow_completed] == nil do workflow_info = Map.put(workflow_info, :workflow_completed, false) end @@ -66,16 +66,16 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do end case Agent.start_link(fn -> workflow_info end) do - {:ok, pid} -> pid - {:error, reason} -> {:error, "Failed to create Workflow Agent: #{inspect reason}"} - end + {:ok, pid} -> pid + {:error, reason} -> {:error, "Failed to create Workflow Agent: #{inspect reason}"} + end end @doc """ Method to retrieve the id from a workflow ## Options - + The `workflow` option defines the Workflow referenced ## Return values @@ -84,14 +84,14 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do """ @spec get_id(pid) :: String.t() def get_id(workflow) do - get_info(workflow)[:id] + get_info(workflow)[:id] end @doc """ Method to retrieve the info associated with a workflow ## Options - + The `workflow` option defines the Workflow referenced ## Return values @@ -100,14 +100,14 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do """ @spec get_info(pid) :: Map def get_info(workflow) do - Agent.get(workflow, fn info -> info end) + Agent.get(workflow, fn info -> info end) end @doc """ Method to determine if a Workflow is completed ## Options - + The `workflow` option defines the Workflow referenced ## Return values @@ -116,7 +116,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do """ @spec complete?(pid) :: term def complete?(workflow) do - completed = get_info(workflow)[:workflow_completed] + completed = get_info(workflow)[:workflow_completed] if completed != nil do completed else @@ -128,7 +128,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to determine if a Workflow has completed in error ## Options - + The `workflow` option defines the Workflow referenced ## Return values @@ -149,7 +149,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to resolve the next Workflow step ## Options - + The `workflow` option defines the Workflow referenced ## Return values @@ -164,7 +164,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Logger.debug("Resolving next milestone for Workflow #{workflow_info[:id]}, current step is #{inspect current_step}") if current_step == nil do - resolved_workflow_info = send_success_notification(workflow_info, "Starting workflow...") + resolved_workflow_info = send_success_notification(workflow_info, "Starting workflow...") else resolved_workflow_info = workflow_info @@ -212,7 +212,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to append a "success" notification to the Workflow's log ## Options - + The `workflow_info` option defines the Workflow info Map The `message` option defines the message to publish @@ -232,7 +232,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to append a "failure" notification to the Workflow's log ## Options - + The `workflow_info` option defines the Workflow info Map The `message` option defines the message to publish @@ -252,7 +252,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Method to publish a "success" notification ## Options - + The `workflow_info` option defines the Workflow info Map The `message` option defines the message to publish @@ -262,15 +262,15 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Map, containing the updated workflow_info """ @spec send_success_notification(Map, String.t()) :: Map - def send_success_notification(workflow_info, message) do - send_notification(workflow_info, true, message) - end + def send_success_notification(workflow_info, message) do + send_notification(workflow_info, true, message) + end @doc """ Method to publish a "failure" notification ## Options - + The `workflow_info` option defines the Workflow info Map The `message` option defines the message to publish @@ -280,15 +280,15 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Map, containing the updated workflow_info """ @spec send_failure_notification(Map, String.t()) :: Map - def send_failure_notification(workflow_info, message) do - send_notification(workflow_info, false, message) - end + def send_failure_notification(workflow_info, message) do + send_notification(workflow_info, false, message) + end @doc """ Method to publish a notification ## Options - + The `workflow_info` option defines the Workflow info Map The `is_success` option defines failure/success of the message @@ -300,8 +300,8 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do Map, containing the updated workflow_info """ @spec send_notification(Map, term, String.t()) :: Map - def send_notification(workflow_info, is_success, message) do - prefix = build_notification_prefix(workflow_info) + def send_notification(workflow_info, is_success, message) do + prefix = build_notification_prefix(workflow_info) Logger.debug("#{prefix} #{message}") workflow_info = add_event_to_log(workflow_info, message, prefix) @@ -310,7 +310,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do NotificationsPublisher.hipchat_notification(is_success, prefix, message, room_names) workflow_info - end + end @spec build_notification_prefix(Map) :: String.t() defp build_notification_prefix(workflow_info) do @@ -359,9 +359,9 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do :ok | {:error, reason} """ @spec save(pid) :: :ok | {:error, String.t()} - def save(workflow) do - try do - workflow_info = get_info(workflow) + def save(workflow) do + try do + workflow_info = get_info(workflow) workflow_error = workflow_info[:workflow_error] if workflow_error == nil && workflow_info[:workflow_completed] != nil do @@ -385,27 +385,27 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do workflow_completed: workflow_info[:workflow_completed], event_log: workflow_info[:event_log], } - + case WorkflowAPI.update_workflow(ManagerApi.get_api, workflow_info[:id], workflow_payload) do %Response{status: 204} -> :ok - %Response{status: status} -> + %Response{status: status} -> error_message = "Failed to save workflow; server returned #{status}" Logger.error(error_message) {:error, error_message} - end + end catch - :exit, code -> + :exit, code -> error_message = "Failed to save workflow; Exited with code #{inspect code}" - Logger.error(error_message) + Logger.error(error_message) {:error, error_message} - :throw, value -> + :throw, value -> error_message = "Failed to save workflow; Throw called with #{inspect value}" {:error, error_message} - what, value -> + what, value -> error_message = "Failed to save workflow; Caught #{inspect what} with #{inspect value}" {:error, error_message} - end - end + end + end @doc """ Method to determine the next workflow step, based on the current state of the workflow @@ -423,49 +423,49 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do if workflow_info[:milestones] == nil || length(workflow_info[:milestones]) == 0 do nil else - current_step = workflow_info[:current_step] + current_step = workflow_info[:current_step] current_step_atom = if current_step == nil || is_atom(current_step) do current_step else String.to_atom(current_step) end - - {_, next_step} = Enum.reduce workflow_info[:milestones], {false, nil}, fn(available_step, {use_next_step, next_step})-> + + {_, next_step} = Enum.reduce workflow_info[:milestones], {false, nil}, fn(available_step, {use_next_step, next_step})-> available_step_atom = if available_step == nil || is_atom(available_step) do available_step else String.to_atom(available_step) end - #we already found the next step - if (next_step != nil) do - {false, next_step} - else - #we're just starting the workflow - if (current_step_atom == nil) do - {false, available_step_atom} - else - if (use_next_step) do - {false, available_step_atom} - else - #the current item in the list is the current workflow step - if (current_step_atom == available_step_atom) do - {true, next_step} - else - #the current item in the list is NOT the current workflow step - {false, next_step} - end - end - end - end - end + #we already found the next step + if (next_step != nil) do + {false, next_step} + else + #we're just starting the workflow + if (current_step_atom == nil) do + {false, available_step_atom} + else + if (use_next_step) do + {false, available_step_atom} + else + #the current item in the list is the current workflow step + if (current_step_atom == available_step_atom) do + {true, next_step} + else + #the current item in the list is NOT the current workflow step + {false, next_step} + end + end + end + end + end if next_step == nil || is_atom(next_step) do next_step else String.to_atom(next_step) end end - end + end @doc """ Method to complete a Workflow in failure @@ -482,7 +482,7 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do """ @spec workflow_failed(pid, String.t()) :: :ok | {:error, String.t()} def workflow_failed(workflow, reason) do - workflow_info = get_info(workflow) + workflow_info = get_info(workflow) workflow_info = send_failure_notification(workflow_info, "Workflow Milestone Failed: #{inspect workflow_info[:current_step]}. Reason: #{reason}") workflow_info = Map.merge(workflow_info, %{workflow_completed: true}) workflow_info = Map.merge(workflow_info, %{workflow_error: true}) @@ -496,10 +496,10 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do workflow_step_durations = %{} end - timestamp = TimexExtensions.get_elapsed_timestamp(workflow_info[:step_time]) + timestamp = TimexExtensions.get_elapsed_timestamp(workflow_info[:step_time]) workflow_step_durations = Map.put(workflow_step_durations, to_string(workflow_info[:current_step]), timestamp) workflow_info = Map.put(workflow_info, :workflow_step_durations, workflow_step_durations) - workflow_info = send_success_notification(workflow_info, "Completed Workflow Milestone: #{inspect workflow_info[:current_step]}, in #{timestamp}") + workflow_info = send_success_notification(workflow_info, "Completed Workflow Milestone: #{inspect workflow_info[:current_step]}, in #{timestamp}") Agent.update(workflow, fn _ -> workflow_info end) save(workflow) @@ -553,4 +553,4 @@ defmodule OpenAperture.WorkflowOrchestrator.Workflow do NotificationsPublisher.email_notification(subject,body,recipients) end end -end \ No newline at end of file +end diff --git a/lib/workflow_orchestrator/workflow_fsm.ex b/lib/workflow_orchestrator/workflow_fsm.ex index b2063bb..3225654 100755 --- a/lib/workflow_orchestrator/workflow_fsm.ex +++ b/lib/workflow_orchestrator/workflow_fsm.ex @@ -1,7 +1,7 @@ # # == workflow_fsm.ex # -# This module contains the gen_fsm for Workflow Orchestration. Most executions +# This module contains the gen_fsm for Workflow Orchestration. Most executions # through this FSM will follow one of the following path(s): # # * Workflow is complete @@ -18,6 +18,7 @@ # require Logger require Timex.Date +require Timex.DateFormat defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do @@ -39,6 +40,9 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do alias OpenAperture.WorkflowOrchestratorApi.Request, as: OrchestratorRequest + alias OpenAperture.ManagerApi + alias ManagerApi.Workflow, as: WorkflowAPI + @doc """ Method to start a WorkflowFSM @@ -49,16 +53,16 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `delivery_tag` options defines the identifier of the request message to which this FSM is associated ## Return Values - + {:ok, WorkflowFSM} | {:error, reason} """ @spec start_link(Map, String.t()) :: {:ok, pid} | {:error, String.t()} - def start_link(payload, delivery_tag) do + def start_link(payload, delivery_tag) do case Workflow.create_from_payload(payload) do {:error, reason} -> {:error, reason} workflow -> :gen_fsm.start_link(__MODULE__, %{workflow: workflow, delivery_tag: delivery_tag}, []) end - end + end @doc """ Method to execute a (rescursive) run of the WorkflowFSM @@ -68,7 +72,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `workflowfsm` option defines the PID of the FSM ## Return Values - + :completed """ @spec execute(pid) :: :completed @@ -87,15 +91,15 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:ok, :workflow_starting, state_data} """ @spec init(pid) :: {:ok, :workflow_starting, Map} - def init(state_data) do + def init(state_data) do state_data = Map.put(state_data, :workflow_fsm_id, "#{UUID.uuid1()}") state_data = Map.put(state_data, :workflow_fsm_prefix, "[WorkflowFSM][#{state_data[:workflow_fsm_id]}][Workflow][#{Workflow.get_id(state_data[:workflow])}]") {:ok, :workflow_starting, state_data} - end + end @doc """ :gen_fsm callback - http://www.erlang.org/doc/man/gen_fsm.html#Module:terminate-3 @@ -109,14 +113,14 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + :ok """ @spec terminate(term, term, Map) :: :ok - def terminate(_reason, _current_state, state_data) do - Logger.debug("#{state_data[:workflow_fsm_prefix]} Workflow Orchestration has finished normally") - :ok - end + def terminate(_reason, _current_state, state_data) do + Logger.debug("#{state_data[:workflow_fsm_prefix]} Workflow Orchestration has finished normally") + :ok + end @doc """ :gen_fsm callback - http://www.erlang.org/doc/man/gen_fsm.html#Module:code_change-4 @@ -132,7 +136,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `opts` option contains additional extra options ## Return Values - + {:ok, term, Map} """ @spec code_change(term, term, Map, term) :: {:ok, term, Map} @@ -152,13 +156,13 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:next_state,term,Map} """ @spec handle_info(term, term, Map) :: {:next_state,term,Map} def handle_info(_info, current_state, state_data) do {:next_state,current_state,state_data} - end + end @doc """ :gen_fsm callback - http://www.erlang.org/doc/man/gen_fsm.html#Module:handle_event-3 @@ -172,13 +176,13 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:next_state,term,Map} """ @spec handle_event(term, term, Map) :: {:next_state,term,Map} def handle_event(_event, current_state, state_data) do {:next_state,current_state,state_data} - end + end @doc """ :gen_fsm callback - http://www.erlang.org/doc/man/gen_fsm.html#Module:handle_sync_event-4 @@ -194,13 +198,13 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:next_state,term,Map} """ @spec handle_sync_event(term, term, term, Map) :: {:next_state,term,Map} def handle_sync_event(_event, _from, current_state, state_data) do {:next_state,current_state,state_data} - end + end @doc """ :gen_fsm callback - http://www.erlang.org/doc/man/gen_fsm.html#Module:StateName-3 for the state :workflow_starting @@ -218,7 +222,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:reply, :in_progress, next_milestone, state_data} """ @spec workflow_starting(term, term, Map) :: {:reply, :in_progress, term, Map} @@ -229,7 +233,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do Workflow.save(state_data[:workflow]) case Workflow.complete?(state_data[:workflow]) do - true -> + true -> if Workflow.failed?(state_data[:workflow]) do Workflow.workflow_failed(state_data[:workflow], "Milestone worker has reported a failure") else @@ -237,7 +241,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do end {:reply, :in_progress, :workflow_completed, state_data} - false -> + false -> Logger.debug("#{state_data[:workflow_fsm_prefix]} Workflow has not finished, resolving next milestone...") {:reply, :in_progress, Workflow.resolve_next_milestone(state_data[:workflow]), state_data} end @@ -257,7 +261,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:stop, :normal, {:completed, state_data[:workflow]}, state_data} """ @spec workflow_completed(term, term, Map) :: {:stop, :normal, {:completed, pid}, Map} @@ -289,35 +293,76 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:reply, :in_progress, :workflow_completed, state_data} """ @spec build(term, term, Map) :: {:reply, :in_progress, :workflow_completed, Map} - def build(_event, _from, state_data) do - call_builder(state_data) + def build(_event, _from, state_data) do + deployment_repo = state_data[:workflow][:deployment_repo] + + if any_builds_queued?(deployment_repo) do + process_queued_builds(deployment_repo, state_data) + else + call_builder(state_data) + end + end + + defp any_builds_queued?(deployment_repo) do + query = %{deployment_repo: deployment_repo, workflow_completed: false} + workflows = ManagerApi.get_api |> WorkflowAPI.list!(query) + + if length(workflows) > 0 do + workflow_in_progress = workflows |> Enum.find fn(wf) -> + wf.current_step == "build" + end + + if workflow_in_progress do + {true, :build} + else + earliest_pending = workflows |> Enum.min_by fn(wf) -> + wf.inserted_at |> Timex.DateFormat.parse("{RFC1123}") + end + {true, :pending, earliest_pending.workflow_id} + end + end + end + + defp process_queued_builds(deployment_repo, state_data) do + case any_builds_queued?(deployment_repo) do + {true, :build} -> + :timer.sleep Configuration.get_queue_build_delay + process_queued_builds(deployment_repo, state_data) + {true, :pending, workflow_id} -> + %{state_data | workflow: %{workflow_id: workflow_id}} |> call_builder + process_queued_builds(deployment_repo, state_data) + _other -> + {:reply, :in_progress, :workflow_completed, state_data} + end end @spec config(term, term, Map) :: {:reply, :in_progress, :workflow_completed, Map} - def config(_event, _from, state_data) do + def config(_event, _from, state_data) do call_builder(state_data) end def call_builder(state_data) do type = Workflow.get_info(state_data[:workflow])[:current_step] - Logger.debug("#{state_data[:workflow_fsm_prefix]} Requesting #{type}...") + Logger.debug("#{state_data[:workflow_fsm_prefix]} Requesting #{type}...") {messaging_exchange_id, docker_build_etcd_cluster} = DockerHostResolver.next_available workflow_info = Workflow.get_info(state_data[:workflow]) - if workflow_info[:build_messaging_exchange_id] != nil do + + if workflow_info[:build_messaging_exchange_id] do messaging_exchange_id = workflow_info[:build_messaging_exchange_id] Workflow.add_success_notification(state_data[:workflow], "The Workflow request has overriden the default build messaging_exchange_id to #{messaging_exchange_id}") - end + end + cond do docker_build_etcd_cluster == nil -> Workflow.workflow_failed(state_data[:workflow], "Unable to request #{type} - no build clusters are available!") !OpenAperture.ManagerApi.MessagingExchange.exchange_has_modules_of_type?(messaging_exchange_id, "builder") -> Workflow.workflow_failed(state_data[:workflow], "Unable to request #{type} - no Builders are currently accessible in exchange #{messaging_exchange_id}!") - true -> + true -> Workflow.add_success_notification(state_data[:workflow], "Dispatching a #{type} request to exchange #{messaging_exchange_id}, docker build cluster #{docker_build_etcd_cluster["etcd_token"]}...") request = OrchestratorRequest.from_payload(Workflow.get_info(state_data[:workflow])) @@ -356,11 +401,11 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:reply, :in_progress, :workflow_completed, state_data} """ @spec deploy(term, term, Map) :: {:reply, :in_progress, :workflow_completed, Map} - def deploy(_event, _from, state_data) do + def deploy(_event, _from, state_data) do call_deployer(state_data) end @@ -381,11 +426,11 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do The `state_data` option contains the default state data of the :gen_fsm server ## Return Values - + {:reply, :in_progress, :workflow_completed, state_data} """ @spec deploy_oa(term, term, Map) :: {:reply, :in_progress, :workflow_completed, Map} - def deploy_oa(_event, _from, state_data) do + def deploy_oa(_event, _from, state_data) do call_deployer(state_data) end @@ -425,9 +470,9 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do Workflow.save(state_data[:workflow]) if type == :deploy do - DeployerPublisher.deploy(state_data[:delivery_tag], messaging_exchange_id, OrchestratorRequest.to_payload(request)) + DeployerPublisher.deploy(state_data[:delivery_tag], messaging_exchange_id, OrchestratorRequest.to_payload(request)) else - DeployerPublisher.deploy_oa(state_data[:delivery_tag], messaging_exchange_id, OrchestratorRequest.to_payload(request)) + DeployerPublisher.deploy_oa(state_data[:delivery_tag], messaging_exchange_id, OrchestratorRequest.to_payload(request)) end end end @@ -436,4 +481,4 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do # call back in after it's action has been completed {:reply, :in_progress, :workflow_completed, state_data} end -end \ No newline at end of file +end diff --git a/test/stubs/workflow_api_stub.exs b/test/stubs/workflow_api_stub.exs new file mode 100644 index 0000000..a369637 --- /dev/null +++ b/test/stubs/workflow_api_stub.exs @@ -0,0 +1,58 @@ +defmodule OpenAperture.WorkflowOrchestrator.WorkflowAPIStub do + @lists %{ + pending: [ + %{ + workflow_id: 1, development_repo: "myCloud/myApp", + workflow_completed: false, current_step: "configure", + inserted_at: "Thu, 30 Jul 2015 07:33:44 UTC" + + }, + %{ + workflow_id: 2, development_repo: "myCloud/myApp", + workflow_completed: false, current_step: "configure", + inserted_at: "Thu, 30 Jul 2015 07:33:45 UTC" + } + ], + build: [ + %{ + workflow_id: 1, development_repo: "myCloud/myApp", + workflow_completed: false, current_step: "configure", + inserted_at: "Thu, 30 Jul 2015 07:33:44 UTC" + }, + %{ + workflow_id: 2, development_repo: "myCloud/myApp", + workflow_completed: false, current_step: "build", + inserted_at: "Thu, 31 Jul 2015 07:33:44 UTC" + } + ] + } + + def start(type) do + Agent.start_link fn -> @lists[type] end + end + + def list(agent) do + Agent.get(agent, &(&1)) + end + + def reduce_no_of_entries(agent) do + workflows = list(agent) + + if workflows != [] do + in_build = workflows |> Enum.find fn(wf) -> + wf.current_step == "build" + end + + rest = if in_build do + workflows |> Enum.reject &(&1.current_step == "build") + else + tl(workflows) + end + + Agent.update(agent, fn _data -> rest end) + end + + :ok + end +end + diff --git a/test/workflow_orchestrator/workflow_fsm_test.exs b/test/workflow_orchestrator/workflow_fsm_test.exs index c66069a..61f50d4 100644 --- a/test/workflow_orchestrator/workflow_fsm_test.exs +++ b/test/workflow_orchestrator/workflow_fsm_test.exs @@ -11,60 +11,65 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do alias OpenAperture.WorkflowOrchestrator.Builder.Publisher, as: BuilderPublisher alias OpenAperture.WorkflowOrchestrator.Deployer.Publisher, as: DeployerPublisher alias OpenAperture.WorkflowOrchestrator.Deployer.EtcdClusterMessagingResolver + alias OpenAperture.WorkflowOrchestrator.WorkflowAPIStub # ============================ # start_link tests + setup_all do + Code.require_file("test/stubs/workflow_api_stub.exs") + :ok + end + test "start_link - success" do - :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :create_from_payload, fn _ -> %{} end) - :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) - - payload = %{ - } + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn _ -> %{} end) + :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) + + payload = %{} {result, fsm} = WorkflowFSM.start_link(payload, "#{UUID.uuid1()}") assert result == :ok assert fsm != nil after - :meck.unload(Workflow) + :meck.unload(Workflow) end test "start_link - failure" do - :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :create_from_payload, fn _ -> {:error, "bad news bears"} end) - - payload = %{ - } + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn _ -> {:error, "bad news bears"} end) + + payload = %{ + } {result, reason} = WorkflowFSM.start_link(payload, "#{UUID.uuid1()}") assert result == :error assert reason == "bad news bears" after - :meck.unload(Workflow) + :meck.unload(Workflow) end # ============================ # workflow_starting tests test "workflow_starting - completed" do - :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :create_from_payload, fn _ -> %{} end) - :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) - :meck.expect(Workflow, :save, fn _ -> :ok end) - :meck.expect(Workflow, :complete?, fn _ -> true end) + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn _ -> %{} end) + :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) + :meck.expect(Workflow, :save, fn _ -> :ok end) + :meck.expect(Workflow, :complete?, fn _ -> true end) :meck.expect(Workflow, :failed?, fn _ -> false end) :meck.expect(Workflow, :send_workflow_completed_email, fn _ -> :ok end) - - payload = %{ - } + + payload = %{ + } {:ok, workflow} = WorkflowFSM.start_link(payload, "#{UUID.uuid1()}") {result, workflow_info} = WorkflowFSM.execute(workflow) assert result == :completed assert workflow != nil after - :meck.unload(Workflow) + :meck.unload(Workflow) end test "workflow_starting - completed without any milestones" do @@ -76,7 +81,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(Workflow, :resolve_next_milestone, fn _ -> :workflow_completed end) :meck.expect(Workflow, :failed?, fn _ -> false end) :meck.expect(Workflow, :send_workflow_completed_email, fn _ -> :ok end) - + payload = %{ } @@ -85,31 +90,31 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert result == :completed assert workflow != nil after - :meck.unload(Workflow) - end + :meck.unload(Workflow) + end test "workflow_starting - test email notification" do :meck.new(Workflow, [:passthrough]) :meck.expect(Workflow, :save, fn _ -> :ok end) - :meck.expect(Workflow, :complete?, fn _ -> true end) + :meck.expect(Workflow, :complete?, fn _ -> true end) :meck.expect(Workflow, :failed?, fn _ -> true end) :meck.expect(Workflow, :workflow_failed, fn _,_ -> true end) :meck.expect(Workflow, :send_workflow_completed_email, fn _ -> :ok end) - + payload = %{ } {:reply, :in_progress, :workflow_completed, state_data} = WorkflowFSM.workflow_starting(%{}, %{}, %{workflow: payload}) assert state_data != nil after - :meck.unload(Workflow) + :meck.unload(Workflow) end # ============================ # terminate tests test "terminate" do - assert WorkflowFSM.terminate(:normal, :workflow_completed, %{workflow_fsm_prefix: "[]"}) == :ok + assert WorkflowFSM.terminate(:normal, :workflow_completed, %{workflow_fsm_prefix: "[]"}) == :ok end # ============================ @@ -133,7 +138,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(Workflow, :complete?, fn _ -> true end) :meck.expect(Workflow, :failed?, fn _ -> false end) :meck.expect(Workflow, :send_workflow_completed_email, fn _ -> :ok end) - + payload = %{ } @@ -155,23 +160,31 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(Workflow, :get_info, fn _ -> %{} end) :meck.new(DockerHostResolver, [:passthrough]) - :meck.expect(DockerHostResolver, :next_available, fn -> {nil, nil} end) + :meck.expect(DockerHostResolver, :next_available, fn -> {nil, nil} end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:pending) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{}} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}} assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after :meck.unload(Workflow) :meck.unload(DockerHostResolver) :meck.unload(Dispatcher) - end + end test "build - no builders available" do {:ok, pid} = Agent.start_link(fn -> false end); :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :workflow_failed, fn _, msg -> + :meck.expect(Workflow, :workflow_failed, fn _, msg -> assert msg == "Unable to request build - no Builders are currently accessible in exchange 123!" Agent.update(pid, fn _ -> true end) :ok @@ -184,13 +197,20 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> false end) + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:build) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{}} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}} assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} assert Agent.get(pid, &(&1)) after @@ -209,9 +229,17 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:build) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) + + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(BuilderPublisher, [:passthrough]) - :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 123 @@ -223,19 +251,19 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(DockerHostResolver, [:passthrough]) :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) - + assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(DockerHostResolver) + :meck.unload(Workflow) + :meck.unload(DockerHostResolver) :meck.unload(BuilderPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) - end + end test "build - success, override messaging_exchange_id" do :meck.new(Workflow, [:passthrough]) @@ -246,10 +274,17 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:pending) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(BuilderPublisher, [:passthrough]) - :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 789 @@ -261,23 +296,28 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(DockerHostResolver, [:passthrough]) :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) - + assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(DockerHostResolver) + :meck.unload(Workflow) + :meck.unload(DockerHostResolver) :meck.unload(BuilderPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) end test "build - success through FSM" do :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :create_from_payload, fn _ -> %{} end) + :meck.expect(Workflow, :create_from_payload, fn(_) -> + %{ + deployment_repo: "myCloud/myApp", + workflow_id: 42 + } + end) :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) :meck.expect(Workflow, :save, fn _ -> :ok end) :meck.expect(Workflow, :complete?, fn _ -> false end) @@ -291,7 +331,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do orig_delivery_tag = "#{UUID.uuid1()}" :meck.new(BuilderPublisher, [:passthrough]) - :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == orig_delivery_tag assert messaging_exchange_id == 123 @@ -303,22 +343,166 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(DockerHostResolver, [:passthrough]) :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) - - payload = %{ - } - + + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:build) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) + + + + payload = %{} + + {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) + {result, workflow_info} = WorkflowFSM.execute(workflow) + assert result == :completed + assert workflow != nil + after + :meck.unload(Workflow) + :meck.unload(BuilderPublisher) + :meck.unload(DockerHostResolver) + :meck.unload(OpenAperture.ManagerApi.MessagingExchange) + end + + test "no queued builds" do + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn _ -> + %{deployment_repo: "Cloud/MyApp_docker"} + end) + :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) + :meck.expect(Workflow, :save, fn _ -> :ok end) + :meck.expect(Workflow, :complete?, fn _ -> false end) + :meck.expect(Workflow, :get_info, fn _ -> %{} end) + :meck.expect(Workflow, :resolve_next_milestone, fn _ -> :build end) + :meck.expect(Workflow, :failed?, fn _ -> false end) + :meck.expect(Workflow, :add_success_notification, fn _,_ -> :ok end) + :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) + :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) + + + orig_delivery_tag = "#{UUID.uuid1()}" + :meck.new(BuilderPublisher, [:passthrough]) + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> end) + + :meck.new(DockerHostResolver, [:passthrough]) + :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) + + :meck.new(OpenAperture.ManagerApi.Workflow) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> [] end) + + payload = %{} + + {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) + {result, workflow_info} = WorkflowFSM.execute(workflow) + + assert result == :completed + assert workflow != nil + after + :meck.unload(Workflow) + :meck.unload(BuilderPublisher) + :meck.unload(DockerHostResolver) + :meck.unload(OpenAperture.ManagerApi.MessagingExchange) + end + + test "only pending builds queued up" do + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn(_) -> + %{ + deployment_repo: "myCloud/myApp", + workflow_id: 42 + } + end) + :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) + :meck.expect(Workflow, :save, fn _ -> :ok end) + :meck.expect(Workflow, :complete?, fn _ -> false end) + :meck.expect(Workflow, :get_info, fn _ -> %{} end) + :meck.expect(Workflow, :resolve_next_milestone, fn _ -> :build end) + :meck.expect(Workflow, :failed?, fn _ -> false end) + :meck.expect(Workflow, :add_success_notification, fn _,_ -> :ok end) + :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) + :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) + + + orig_delivery_tag = "#{UUID.uuid1()}" + :meck.new(BuilderPublisher, [:passthrough]) + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> end) + + :meck.new(DockerHostResolver, [:passthrough]) + :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) + + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:pending) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) + + payload = %{} + {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) {result, workflow_info} = WorkflowFSM.execute(workflow) + assert result == :completed assert workflow != nil after - :meck.unload(Workflow) - :meck.unload(BuilderPublisher) + :meck.unload(Workflow) + :meck.unload(BuilderPublisher) + :meck.unload(DockerHostResolver) + :meck.unload(OpenAperture.ManagerApi.MessagingExchange) + end + + test "both a build in progress and a pending build" do + :meck.new(Workflow, [:passthrough]) + :meck.expect(Workflow, :create_from_payload, fn(_) -> + %{ + deployment_repo: "myCloud/myApp", + workflow_id: 42 + } + end) + :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) + :meck.expect(Workflow, :save, fn _ -> :ok end) + :meck.expect(Workflow, :complete?, fn _ -> false end) + :meck.expect(Workflow, :get_info, fn _ -> %{} end) + :meck.expect(Workflow, :resolve_next_milestone, fn _ -> :build end) + :meck.expect(Workflow, :failed?, fn _ -> false end) + :meck.expect(Workflow, :add_success_notification, fn _,_ -> :ok end) + :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) + :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) + + orig_delivery_tag = "#{UUID.uuid1()}" + :meck.new(BuilderPublisher, [:passthrough]) + :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> end) + + :meck.new(DockerHostResolver, [:passthrough]) + :meck.expect(DockerHostResolver, :next_available, fn -> {123, %{"etcd_token" => "123456789000"}} end) + + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:build) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) + + payload = %{} + + {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) + {result, workflow_info} = WorkflowFSM.execute(workflow) + + assert result == :completed + assert workflow != nil + after + :meck.unload(Workflow) + :meck.unload(BuilderPublisher) :meck.unload(DockerHostResolver) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) end @@ -335,7 +519,8 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> nil end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + state_data = %{workflow_fsm_prefix: "[]", workflow: %{}} assert WorkflowFSM.deploy(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} @@ -348,7 +533,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do test "deploy - no deployers" do {:ok, pid} = Agent.start_link(fn -> false end); :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :workflow_failed, fn _, msg -> + :meck.expect(Workflow, :workflow_failed, fn _, msg -> assert msg == "Unable to request deploy - no deploy clusters are available in exchange 123!" Agent.update(pid, fn _ -> true end) :ok @@ -359,7 +544,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> false end) @@ -371,7 +556,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.unload(EtcdClusterMessagingResolver) :meck.unload(Dispatcher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) - end + end test "deploy - failed - no etcd_token" do :meck.new(Workflow, [:passthrough]) @@ -384,7 +569,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 123 @@ -395,16 +580,16 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) - + assert WorkflowFSM.deploy(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) end @@ -415,9 +600,17 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(Workflow, :failed?, fn _ -> false end) :meck.expect(Workflow, :add_success_notification, fn _,_ -> :ok end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} + :meck.new(OpenAperture.ManagerApi.Workflow) + {:ok, wfapi_stub} = WorkflowAPIStub.start(:build) + :meck.expect(OpenAperture.ManagerApi.Workflow, :list!, fn(_, _) -> + list = WorkflowAPIStub.list(wfapi_stub) + WorkflowAPIStub.reduce_no_of_entries(wfapi_stub) + list + end) + + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 123 @@ -428,21 +621,21 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) - + assert WorkflowFSM.deploy(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) - end + end test "deploy - success, override messaging_exchange_id" do :meck.new(Workflow, [:passthrough]) @@ -454,7 +647,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 789 @@ -465,7 +658,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) @@ -475,11 +668,11 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert WorkflowFSM.deploy(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) - end + end test "deploy - success through FSM" do :meck.new(Workflow, [:passthrough]) @@ -494,7 +687,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do orig_delivery_tag = "#{UUID.uuid1()}" :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == orig_delivery_tag assert messaging_exchange_id == 123 @@ -505,12 +698,12 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) - + payload = %{ } @@ -519,10 +712,10 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert result == :completed assert workflow != nil after - :meck.unload(Workflow) + :meck.unload(Workflow) :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) - end + end # ============================ # deploy_oa tests @@ -536,7 +729,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> nil end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) state_data = %{workflow_fsm_prefix: "[]", workflow: %{}} assert WorkflowFSM.deploy_oa(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} @@ -549,7 +742,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do test "deploy_oa - no deployers" do {:ok, pid} = Agent.start_link(fn -> false end); :meck.new(Workflow, [:passthrough]) - :meck.expect(Workflow, :workflow_failed, fn _, msg -> + :meck.expect(Workflow, :workflow_failed, fn _, msg -> assert msg == "Unable to request deploy - no deploy clusters are available in exchange 123!" Agent.update(pid, fn _ -> true end) :ok @@ -560,7 +753,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) :meck.new(Dispatcher, [:passthrough]) - :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) + :meck.expect(Dispatcher, :acknowledge, fn _ -> :ok end) :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> false end) @@ -585,7 +778,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 123 @@ -596,16 +789,16 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) - + assert WorkflowFSM.deploy_oa(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) end @@ -618,7 +811,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 123 @@ -629,18 +822,16 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) :meck.new(OpenAperture.ManagerApi.MessagingExchange, [:passthrough]) :meck.expect(OpenAperture.ManagerApi.MessagingExchange, :exchange_has_modules_of_type?, fn _, _ -> true end) - - assert WorkflowFSM.deploy_oa(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} - after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) end @@ -655,7 +846,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do state_data = %{workflow_fsm_prefix: "[]", workflow: %{}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] assert messaging_exchange_id == 789 @@ -666,7 +857,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) @@ -676,8 +867,8 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert WorkflowFSM.deploy_oa(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after - :meck.unload(Workflow) - :meck.unload(EtcdClusterMessagingResolver) + :meck.unload(Workflow) + :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) :meck.unload(OpenAperture.ManagerApi.MessagingExchange) end @@ -695,7 +886,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do orig_delivery_tag = "#{UUID.uuid1()}" :meck.new(DeployerPublisher, [:passthrough]) - :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> + :meck.expect(DeployerPublisher, :deploy_oa, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == orig_delivery_tag assert messaging_exchange_id == 123 @@ -706,22 +897,21 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do assert payload[:workflow_orchestration_exchange_id] == "1" assert payload[:workflow_orchestration_broker_id] == "1" assert payload[:orchestration_queue_name] == "workflow_orchestration" - :ok + :ok end) :meck.new(EtcdClusterMessagingResolver, [:passthrough]) :meck.expect(EtcdClusterMessagingResolver, :exchange_for_cluster, fn _ -> 123 end) - - payload = %{ - } + + payload = %{} {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) {result, workflow_info} = WorkflowFSM.execute(workflow) assert result == :completed assert workflow != nil after - :meck.unload(Workflow) + :meck.unload(Workflow) :meck.unload(EtcdClusterMessagingResolver) :meck.unload(DeployerPublisher) - end + end end From 870fc4fa5fad5d57fe89857d07d572968a8e631d Mon Sep 17 00:00:00 2001 From: aandrieiev Date: Tue, 4 Aug 2015 14:46:54 +0200 Subject: [PATCH 2/3] Adjust code according to the format of API replies --- lib/workflow_orchestrator/workflow_fsm.ex | 10 +++---- test/stubs/workflow_api_stub.exs | 30 +++++++++---------- .../workflow_fsm_test.exs | 21 ++++++------- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/lib/workflow_orchestrator/workflow_fsm.ex b/lib/workflow_orchestrator/workflow_fsm.ex index 3225654..0641f6d 100755 --- a/lib/workflow_orchestrator/workflow_fsm.ex +++ b/lib/workflow_orchestrator/workflow_fsm.ex @@ -313,16 +313,16 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do if length(workflows) > 0 do workflow_in_progress = workflows |> Enum.find fn(wf) -> - wf.current_step == "build" + wf["current_step"] == "build" end if workflow_in_progress do {true, :build} else earliest_pending = workflows |> Enum.min_by fn(wf) -> - wf.inserted_at |> Timex.DateFormat.parse("{RFC1123}") + wf["inserted_at"] |> Timex.DateFormat.parse("{RFC1123}") end - {true, :pending, earliest_pending.workflow_id} + {true, :pending, earliest_pending["id"]} end end end @@ -332,8 +332,8 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do {true, :build} -> :timer.sleep Configuration.get_queue_build_delay process_queued_builds(deployment_repo, state_data) - {true, :pending, workflow_id} -> - %{state_data | workflow: %{workflow_id: workflow_id}} |> call_builder + {true, :pending, id} -> + %{state_data | workflow: %{id: id}} |> call_builder process_queued_builds(deployment_repo, state_data) _other -> {:reply, :in_progress, :workflow_completed, state_data} diff --git a/test/stubs/workflow_api_stub.exs b/test/stubs/workflow_api_stub.exs index a369637..28bb603 100644 --- a/test/stubs/workflow_api_stub.exs +++ b/test/stubs/workflow_api_stub.exs @@ -2,27 +2,26 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowAPIStub do @lists %{ pending: [ %{ - workflow_id: 1, development_repo: "myCloud/myApp", - workflow_completed: false, current_step: "configure", - inserted_at: "Thu, 30 Jul 2015 07:33:44 UTC" - + "id" => "1", "development_repo" => "myCloud/myApp", + "workflow_completed" => false, "current_step" => "configure", + "inserted_at" => "Thu, 30 Jul 2015 07:33:44 UTC" }, %{ - workflow_id: 2, development_repo: "myCloud/myApp", - workflow_completed: false, current_step: "configure", - inserted_at: "Thu, 30 Jul 2015 07:33:45 UTC" + "id" => "2", "development_repo" => "myCloud/myApp", + "workflow_completed" => false, "current_step" => "configure", + "inserted_at" => "Thu, 30 Jul 2015 07:33:45 UTC" } ], build: [ %{ - workflow_id: 1, development_repo: "myCloud/myApp", - workflow_completed: false, current_step: "configure", - inserted_at: "Thu, 30 Jul 2015 07:33:44 UTC" + "id" => "1", "development_repo" => "myCloud/myApp", + "workflow_completed" => false, "current_step" => "configure", + "inserted_at" => "Thu, 30 Jul 2015 07:33:44 UTC" }, %{ - workflow_id: 2, development_repo: "myCloud/myApp", - workflow_completed: false, current_step: "build", - inserted_at: "Thu, 31 Jul 2015 07:33:44 UTC" + "id" => "2", "development_repo" => "myCloud/myApp", + "workflow_completed" => false, "current_step" => "build", + "inserted_at" => "Thu, 30 Jul 2015 07:13:45 UTC" } ] } @@ -40,11 +39,11 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowAPIStub do if workflows != [] do in_build = workflows |> Enum.find fn(wf) -> - wf.current_step == "build" + wf["current_step"] == "build" end rest = if in_build do - workflows |> Enum.reject &(&1.current_step == "build") + workflows |> Enum.reject &(&1["current_step"] == "build") else tl(workflows) end @@ -55,4 +54,3 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowAPIStub do :ok end end - diff --git a/test/workflow_orchestrator/workflow_fsm_test.exs b/test/workflow_orchestrator/workflow_fsm_test.exs index 61f50d4..eb56021 100644 --- a/test/workflow_orchestrator/workflow_fsm_test.exs +++ b/test/workflow_orchestrator/workflow_fsm_test.exs @@ -12,7 +12,6 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do alias OpenAperture.WorkflowOrchestrator.Deployer.Publisher, as: DeployerPublisher alias OpenAperture.WorkflowOrchestrator.Deployer.EtcdClusterMessagingResolver alias OpenAperture.WorkflowOrchestrator.WorkflowAPIStub - # ============================ # start_link tests @@ -173,7 +172,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp_docker"}} assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} after :meck.unload(Workflow) @@ -210,7 +209,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp_docker"}} assert WorkflowFSM.build(:workflow_completed, nil, state_data) == {:reply, :in_progress, :workflow_completed, state_data} assert Agent.get(pid, &(&1)) after @@ -237,7 +236,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp_docker"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(BuilderPublisher, [:passthrough]) :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] @@ -282,7 +281,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp_docker"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(BuilderPublisher, [:passthrough]) :meck.expect(BuilderPublisher, :build, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] @@ -314,7 +313,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.new(Workflow, [:passthrough]) :meck.expect(Workflow, :create_from_payload, fn(_) -> %{ - deployment_repo: "myCloud/myApp", + deployment_repo: "myCloud/myApp_docker", workflow_id: 42 } end) @@ -357,8 +356,6 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - - payload = %{} {:ok, workflow} = WorkflowFSM.start_link(payload, orig_delivery_tag) @@ -375,7 +372,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do test "no queued builds" do :meck.new(Workflow, [:passthrough]) :meck.expect(Workflow, :create_from_payload, fn _ -> - %{deployment_repo: "Cloud/MyApp_docker"} + %{deployment_repo: "myCloud/MyApp_docker"} end) :meck.expect(Workflow, :get_id, fn _ -> "123abc" end) :meck.expect(Workflow, :save, fn _ -> :ok end) @@ -416,7 +413,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.new(Workflow, [:passthrough]) :meck.expect(Workflow, :create_from_payload, fn(_) -> %{ - deployment_repo: "myCloud/myApp", + deployment_repo: "myCloud/myApp_docker", workflow_id: 42 } end) @@ -464,7 +461,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do :meck.new(Workflow, [:passthrough]) :meck.expect(Workflow, :create_from_payload, fn(_) -> %{ - deployment_repo: "myCloud/myApp", + deployment_repo: "myCloud/myApp_docker", workflow_id: 42 } end) @@ -608,7 +605,7 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSMTest do list end) - state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp"}, delivery_tag: "#{UUID.uuid1()}"} + state_data = %{workflow_fsm_prefix: "[]", workflow: %{deployment_repo: "myCloud/myApp_docker"}, delivery_tag: "#{UUID.uuid1()}"} :meck.new(DeployerPublisher, [:passthrough]) :meck.expect(DeployerPublisher, :deploy, fn delivery_tag, messaging_exchange_id, payload -> assert delivery_tag == state_data[:delivery_tag] From 5615de18dafb4bc29a52430b48e0e113c12cd082 Mon Sep 17 00:00:00 2001 From: aandrieiev Date: Tue, 4 Aug 2015 14:53:03 +0200 Subject: [PATCH 3/3] Add @spec references to new WorkflowFSM functions --- lib/workflow_orchestrator/workflow_fsm.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/workflow_orchestrator/workflow_fsm.ex b/lib/workflow_orchestrator/workflow_fsm.ex index 0641f6d..a282154 100755 --- a/lib/workflow_orchestrator/workflow_fsm.ex +++ b/lib/workflow_orchestrator/workflow_fsm.ex @@ -307,6 +307,8 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do end end + @doc false + @spec any_builds_queued?(String.t) :: {Boolean, Atom} | {Boolean, Atom, String.t} | nil defp any_builds_queued?(deployment_repo) do query = %{deployment_repo: deployment_repo, workflow_completed: false} workflows = ManagerApi.get_api |> WorkflowAPI.list!(query) @@ -327,6 +329,8 @@ defmodule OpenAperture.WorkflowOrchestrator.WorkflowFSM do end end + @doc false + @spec process_queued_builds(String.t, Map) :: {Atom, Atom, Atom, Map} defp process_queued_builds(deployment_repo, state_data) do case any_builds_queued?(deployment_repo) do {true, :build} ->