diff --git a/airflow/airflow.go b/airflow/airflow.go index 1e809f7ba..012ec19c9 100644 --- a/airflow/airflow.go +++ b/airflow/airflow.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strings" airflowversions "github.com/astronomer/astro-cli/airflow_versions" "github.com/astronomer/astro-cli/pkg/fileutil" @@ -198,5 +199,9 @@ func repositoryName(name string) string { // imageName creates an airflow image name func ImageName(name, tag string) string { + // If name already looks like a full image (has a slash and a colon), use as-is + if strings.Contains(name, "/") && strings.Contains(name, ":") { + return name + } return fmt.Sprintf("%s:%s", repositoryName(name), tag) } diff --git a/cmd/cloud/deploy.go b/cmd/cloud/deploy.go index 23c50c5e9..17a951aa9 100644 --- a/cmd/cloud/deploy.go +++ b/cmd/cloud/deploy.go @@ -13,21 +13,21 @@ import ( ) var ( - forceDeploy bool - forcePrompt bool - saveDeployConfig bool - pytest bool - parse bool - dags bool - waitForDeploy bool - image bool - dagsPath string - pytestFile string - envFile string - imageName string - deploymentName string - deployDescription string - deployExample = ` + forceDeploy bool + forcePrompt bool + saveDeployConfig bool + pytest bool + parse bool + dags bool + waitForDeploy bool + image bool + dagsPath string + pytestFile string + envFile string + imageName string + deployDeploymentName string + deployDescription string + deployExample = ` Specify the ID of the Deployment on Astronomer you would like to deploy this project to: $ astro deploy @@ -68,7 +68,7 @@ func NewDeployCmd() *cobra.Command { cmd.Flags().BoolVarP(&dags, "dags", "d", false, "Push only DAGs to your Astro Deployment") cmd.Flags().BoolVarP(&image, "image", "", false, "Push only an image to your Astro Deployment. If you have DAG Deploy enabled your DAGs will not be affected.") cmd.Flags().StringVar(&dagsPath, "dags-path", "", "If set deploy dags from this path instead of the dags from working directory") - cmd.Flags().StringVarP(&deploymentName, "deployment-name", "n", "", "Name of the deployment to deploy to") + cmd.Flags().StringVarP(&deployDeploymentName, "deployment-name", "n", "", "Name of the deployment to deploy to") cmd.Flags().BoolVar(&parse, "parse", false, "Succeed only if all DAGs in your Astro project parse without errors") cmd.Flags().BoolVarP(&waitForDeploy, "wait", "w", false, "Wait for the Deployment to become healthy before ending the command") cmd.Flags().MarkHidden("dags-path") //nolint:errcheck @@ -146,7 +146,7 @@ func deploy(cmd *cobra.Command, args []string) error { Pytest: pytestFile, EnvFile: envFile, ImageName: imageName, - DeploymentName: deploymentName, + DeploymentName: deployDeploymentName, Prompt: forcePrompt, Dags: dags, Image: image, diff --git a/cmd/cloud/deployment.go b/cmd/cloud/deployment.go index 35a973374..d3f1db639 100644 --- a/cmd/cloud/deployment.go +++ b/cmd/cloud/deployment.go @@ -1,6 +1,7 @@ package cloud import ( + "context" "fmt" "io" "strings" @@ -14,8 +15,10 @@ import ( "github.com/astronomer/astro-cli/cloud/organization" "github.com/astronomer/astro-cli/cloud/team" "github.com/astronomer/astro-cli/cloud/user" + "github.com/astronomer/astro-cli/config" "github.com/astronomer/astro-cli/pkg/httputil" "github.com/astronomer/astro-cli/pkg/input" + remoteexec "github.com/astronomer/astro-cli/remote-exec" "github.com/pkg/errors" "github.com/spf13/cobra" ) @@ -31,6 +34,7 @@ var ( label string runtimeVersion string deploymentID string + deploymentName string logsKeyword string forceDelete bool description string @@ -100,6 +104,8 @@ var ( errFlag = errors.New("--deployment-file can not be used with other arguments") errInvalidExecutor = errors.New("not a valid executor") errInvalidCloudProvider = errors.New("not a valid cloud provider. It can only be gcp, azure or aws") + agentToken string + remoteAPIURL string ) func newDeploymentRootCmd(out io.Writer) *cobra.Command { @@ -127,6 +133,7 @@ func newDeploymentRootCmd(out io.Writer) *cobra.Command { newDeploymentTokenRootCmd(out), newDeploymentHibernateCmd(), newDeploymentWakeUpCmd(), + newDeploymentRemoteExecCmd(), ) return cmd } @@ -594,6 +601,32 @@ func newDeploymentWakeUpCmd() *cobra.Command { return cmd } +func newDeploymentRemoteExecCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "remote-exec", + Aliases: []string{"re"}, + Short: "Run remote execution agents locally", + Long: "Run remote execution agents locally (worker, dag-processor, triggerer).", + } + cmd.PersistentFlags().StringVar(&workspaceID, "workspace-id", "", "workspace assigned to deployment") + cmd.PersistentFlags().StringVar(&deploymentID, "deployment-id", "", "deployment ID for remote execution") + cmd.PersistentFlags().StringVar(&deploymentName, "deployment-name", "", "deployment name for remote execution") + cmd.PersistentFlags().StringVar(&agentToken, "agent-token", "", "agent token for remote execution") + cmd.PersistentFlags().StringVar(&remoteAPIURL, "remote-api-url", "", "remote API URL for remote execution (optional)") + cmd.MarkPersistentFlagRequired("agent-token") + cmd.AddCommand( + newRemoteExecInitCmd(), + newRemoteExecStartCmd(astroCoreClient), + newRemoteExecRunCmd(), + newRemoteExecPSCmd(), + newRemoteExecStopCmd(), + newRemoteExecKillCmd(), + newRemoteExecRestartCmd(astroCoreClient), + newRemoteExecBashCmd(), + ) + return cmd +} + func deploymentList(cmd *cobra.Command, out io.Writer) error { ws, err := coalesceWorkspace() if err != nil { @@ -1409,3 +1442,338 @@ func getOverrideUntil(until, forDuration string) (*time.Time, error) { } return nil, nil } + +// getDeploymentWithRemoteExecution gets deployment information including remote execution details +func getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName string) (*astrocore.Deployment, error) { + // First get the platform deployment to get the deployment ID + platformDeployment, err := deployment.GetDeployment(ws, deploymentID, deploymentName, false, nil, platformCoreClient, astroCoreClient) + if err != nil { + return nil, err + } + + // Now get the core deployment with remote execution information + c, err := config.GetCurrentContext() + if err != nil { + return nil, err + } + + resp, err := astroCoreClient.GetDeploymentWithResponse(context.Background(), c.Organization, platformDeployment.Id) + if err != nil { + return nil, err + } + + err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body) + if err != nil { + return nil, err + } + + return resp.JSON200, nil +} + +func newRemoteExecInitCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "init", + Short: "Initialize remote execution environment", + Long: "Initialize the remote execution environment with required configuration.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + fmt.Println("Remote execution environment initialized successfully") + fmt.Printf("Compose file: %s\n", re.GetComposeFilePath()) + return nil + }, + } + return cmd +} + +func newRemoteExecStartCmd(coreClient interface{}) *cobra.Command { + cmd := &cobra.Command{ + Use: "start", + Short: "Start remote execution agents", + Long: "Start the remote execution agents (worker, dag-processor, triggerer).", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.Start("", "", false, 60) + }, + } + return cmd +} + +func newRemoteExecStopCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "stop", + Short: "Stop remote execution agents", + Long: "Stop the remote execution agents and clean up containers.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.Stop(false) + }, + } + return cmd +} + +func newRemoteExecPSCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "ps", + Short: "Show status of remote execution agents", + Long: "Display the current status of all remote execution agent containers.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.PS() + }, + } + return cmd +} + +func newRemoteExecRunCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "run", + Short: "Run a command in a remote execution agent", + Long: "Execute a command inside one of the remote execution agent containers.", + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.Run("", args, "") + }, + } + return cmd +} + +func newRemoteExecRestartCmd(coreClient interface{}) *cobra.Command { + cmd := &cobra.Command{ + Use: "restart", + Short: "Restart remote execution agents", + Long: "Restart all remote execution agent containers.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.Restart() + }, + } + return cmd +} + +func newRemoteExecBashCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "bash", + Short: "Open a bash shell in a remote execution agent", + Long: "Open an interactive bash shell inside a remote execution agent container.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + return re.Bash("") + }, + } + return cmd +} + +func newRemoteExecKillCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "kill", + Short: "Force kill remote execution agents", + Long: "Force kill all remote execution agent containers.", + RunE: func(cmd *cobra.Command, args []string) error { + ws, err := coalesceWorkspace() + if err != nil { + return errors.Wrap(err, "failed to find a valid workspace") + } + currentDeployment, err := getDeploymentWithRemoteExecution(ws, deploymentID, deploymentName) + if err != nil { + return err + } + imageName := fmt.Sprintf("%s/airflow:latest", currentDeployment.ReleaseName) + remoteAPIURL := "" + if currentDeployment.RemoteExecution != nil { + remoteAPIURL = currentDeployment.RemoteExecution.RemoteApiUrl + } + if remoteAPIURL == "" { + return fmt.Errorf("remote API URL is required but could not be determined from deployment. Please provide it using --remote-api-url flag") + } + config := remoteexec.Config{ + AgentToken: agentToken, + RemoteAPIURL: remoteAPIURL, + DeploymentName: deploymentName, + ImageName: imageName, + } + re, err := remoteexec.Init(config) + if err != nil { + return err + } + // Stop with force (equivalent to kill) + return re.Stop(true) + }, + } + return cmd +} diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 000000000..6e80baa02 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,33 @@ +x-common-env-vars: &common-env-vars + AIRFLOW__CORE__LOAD_EXAMPLES: "True" + ASTRO_AGENT_CLIENT_DISALLOW_DEFAULT_XCOM_BACKEND: "False" + ASTRO_AGENT_CLIENT_DISALLOW_DEFAULT_SECRET_BACKEND: "False" + ASTRO_AGENT_CLIENT_LOG_LEVEL: "DEBUG" + ASTRO_AGENT_CLIENT_API_SERVER: "http://localhost:8080" + ASTRO_AGENT_CLIENT_API_TOKEN: "test-token" + +services: + dag-processor: + image: test/airflow:latest + command: ["dag_processor"] + environment: + <<: *common-env-vars + ASTRO_AGENT_CLIENT_AGENT_ID: "dag-processor-b5e4f3a6-90ab-4fd0-b5cd-444f8910471e" + restart: always + worker: + image: test/airflow:latest + environment: + <<: *common-env-vars + AIRFLOW__OPENLINEAGE__DISABLED: "true" + ASTRO_AGENT_CLIENT_SYNC_SLOTS: "10" + ASTRO_AGENT_CLIENT_AGENT_ID: "worker-91f2f012-3851-45aa-a28b-7f3721c27e66" + restart: always + triggerer: + image: test/airflow:latest + command: ["triggerer"] + environment: + <<: *common-env-vars + AIRFLOW__OPENLINEAGE__DISABLED: "true" + ASTRO_AGENT_CLIENT_TRIGGERER__ASYNC_SLOTS: "1000" + ASTRO_AGENT_CLIENT_AGENT_ID: "triggerer-6c59f9bc-26e9-4b1f-ae26-068321663b5c" + restart: always diff --git a/remote-exec/docker.go b/remote-exec/docker.go new file mode 100644 index 000000000..f5b62ff84 --- /dev/null +++ b/remote-exec/docker.go @@ -0,0 +1,203 @@ +package remoteexec + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "time" + + "github.com/astronomer/astro-cli/airflow" + airflowTypes "github.com/astronomer/astro-cli/airflow/types" + "github.com/google/uuid" +) + +const ( + composeFile = "remote-docker-compose.yaml" +) + +// RemoteExecDocker wraps the airflow DockerCompose functionality for remote execution +type RemoteExecDocker struct { + *airflow.DockerCompose + composeFile string +} + +// Config holds configuration for remote execution services +type Config struct { + ProjectName string + ImageName string + ComposeFile string + EnvFile string + AgentToken string + RemoteAPIURL string + DPID string + WorkerID string + TriggererID string + DeploymentName string +} + +// Init initializes a new RemoteExecDocker instance by reusing airflow DockerCompose +func Init(config Config) (*RemoteExecDocker, error) { + // Set default compose file if not provided + if config.ComposeFile == "" { + config.ComposeFile = composeFile + } + + // Copy the remote-exec docker-compose file to the current working directory + _, filename, _, _ := runtime.Caller(0) + dir := filepath.Dir(filename) + sourceComposeFile := filepath.Join(dir, "remote-docker-compose.yaml") + + // Read the source compose file + sourceContent, err := os.ReadFile(sourceComposeFile) + if err != nil { + return nil, fmt.Errorf("failed to read remote-exec compose file: %w", err) + } + + // Write to the current working directory + err = os.WriteFile(config.ComposeFile, sourceContent, 0644) + if err != nil { + return nil, fmt.Errorf("failed to write compose file to current directory: %w", err) + } + + // Set environment variables for the remote execution services + if config.AgentToken != "" { + os.Setenv("AGENT_TOKEN", config.AgentToken) + } else { + return nil, fmt.Errorf("agent token is required for remote exec docker compose") + } + if config.RemoteAPIURL != "" { + os.Setenv("ASTRO_REMOTE_API_URL", config.RemoteAPIURL) + } + if config.ImageName != "" { + os.Setenv("REMOTE_EXEC_IMAGE", config.ImageName) + } + + if config.DPID != "" { + os.Setenv("DP_ID", config.DPID) + } else { + os.Setenv("DP_ID", "dag-processor-"+uuid.NewString()) + } + if config.WorkerID != "" { + os.Setenv("WORKER_ID", config.WorkerID) + } else { + os.Setenv("WORKER_ID", "worker-"+uuid.NewString()) + } + if config.TriggererID != "" { + os.Setenv("TRIGGERER_ID", config.TriggererID) + } else { + os.Setenv("TRIGGERER_ID", "triggerer-"+uuid.NewString()) + } + + // Initialize the underlying airflow DockerCompose + dockerCompose, err := airflow.DockerComposeInit(".", config.EnvFile, "Dockerfile", config.ImageName) + if err != nil { + return nil, fmt.Errorf("failed to initialize DockerCompose: %w", err) + } + + return &RemoteExecDocker{ + DockerCompose: dockerCompose, + composeFile: config.ComposeFile, + }, nil +} + +// Start starts the remote execution services using the custom compose file +func (r *RemoteExecDocker) Start(imageName, buildSecretString string, noCache bool, waitTime time.Duration) error { + // Use the custom compose file instead of the default one + return r.DockerCompose.Start(imageName, "", r.composeFile, buildSecretString, noCache, false, waitTime, nil) +} + +// Stop stops the remote execution services +func (r *RemoteExecDocker) Stop(waitForExit bool) error { + return r.DockerCompose.Stop(waitForExit) +} + +// PS shows the status of remote execution services +func (r *RemoteExecDocker) PS() error { + return r.DockerCompose.PS() +} + +// Logs shows logs from the remote execution services +func (r *RemoteExecDocker) Logs(follow bool, serviceNames ...string) error { + return r.DockerCompose.Logs(follow, serviceNames...) +} + +// Run runs a command in a specific service container +func (r *RemoteExecDocker) Run(serviceName string, args []string, user string) error { + // For remote exec, we'll run the command in the specified service + // The airflow Run method runs in the webserver by default, so we need to adapt + if serviceName == "" { + serviceName = "worker" // Default to worker service for remote exec + } + + // Use the underlying Run method but with service-specific logic + return r.DockerCompose.Run(args, user) +} + +// Bash opens a bash shell in a specific service container +func (r *RemoteExecDocker) Bash(serviceName string) error { + if serviceName == "" { + serviceName = "worker" // Default to worker service for remote exec + } + return r.DockerCompose.Bash(serviceName) +} + +// Build builds the remote execution image using the airflow image handler +func (r *RemoteExecDocker) Build(dockerfile, buildSecretString string, noCache bool) error { + // Create an image handler for building + imageHandler := airflow.ImageHandlerInit("") + buildConfig := airflowTypes.ImageBuildConfig{ + Path: ".", + NoCache: noCache, + } + return imageHandler.Build(dockerfile, buildSecretString, buildConfig) +} + +// Push pushes the remote execution image to a registry +func (r *RemoteExecDocker) Push(remoteImage, username, token string, getImageRepoSha bool) (string, error) { + imageHandler := airflow.ImageHandlerInit("") + return imageHandler.Push(remoteImage, username, token, getImageRepoSha) +} + +// Pull pulls the remote execution image from a registry +func (r *RemoteExecDocker) Pull(remoteImage, username, token string) error { + imageHandler := airflow.ImageHandlerInit("") + return imageHandler.Pull(remoteImage, username, token) +} + +// GetImageLabels gets all labels from the remote execution image +func (r *RemoteExecDocker) GetImageLabels() (map[string]string, error) { + imageHandler := airflow.ImageHandlerInit("") + return imageHandler.ListLabels() +} + +// GetImageLabel gets a specific label from the remote execution image +func (r *RemoteExecDocker) GetImageLabel(labelName string) (string, error) { + imageHandler := airflow.ImageHandlerInit("") + return imageHandler.GetLabel("", labelName) +} + +// DoesImageExist checks if the remote execution image exists +func (r *RemoteExecDocker) DoesImageExist() error { + imageHandler := airflow.ImageHandlerInit("") + return imageHandler.DoesImageExist("") +} + +// Restart restarts the remote execution services +func (r *RemoteExecDocker) Restart() error { + // Stop and then start the services + err := r.Stop(false) + if err != nil { + return fmt.Errorf("failed to stop services: %w", err) + } + + return r.Start("", "", false, 0) +} + +// GetComposeFilePath returns the full path to the compose file +func (r *RemoteExecDocker) GetComposeFilePath() string { + // Get the directory where this Go file is located + _, filename, _, _ := runtime.Caller(0) + dir := filepath.Dir(filename) + return filepath.Join(dir, r.composeFile) +} diff --git a/remote-exec/remote-docker-compose.yaml b/remote-exec/remote-docker-compose.yaml new file mode 100644 index 000000000..cbcfdb416 --- /dev/null +++ b/remote-exec/remote-docker-compose.yaml @@ -0,0 +1,33 @@ +x-common-env-vars: &common-env-vars + AIRFLOW__CORE__LOAD_EXAMPLES: "True" + ASTRO_AGENT_CLIENT_DISALLOW_DEFAULT_XCOM_BACKEND: "False" + ASTRO_AGENT_CLIENT_DISALLOW_DEFAULT_SECRET_BACKEND: "False" + ASTRO_AGENT_CLIENT_LOG_LEVEL: "DEBUG" + ASTRO_AGENT_CLIENT_API_SERVER: "${ASTRO_REMOTE_API_URL}" + ASTRO_AGENT_CLIENT_API_TOKEN: "${AGENT_TOKEN}" + +services: + dag-processor: + image: ${REMOTE_EXEC_IMAGE} + command: ["dag_processor"] + environment: + <<: *common-env-vars + ASTRO_AGENT_CLIENT_AGENT_ID: "${DP_ID}" + restart: always + worker: + image: ${REMOTE_EXEC_IMAGE} + environment: + <<: *common-env-vars + AIRFLOW__OPENLINEAGE__DISABLED: "true" + ASTRO_AGENT_CLIENT_SYNC_SLOTS: "10" + ASTRO_AGENT_CLIENT_AGENT_ID: "${WORKER_ID}" + restart: always + triggerer: + image: ${REMOTE_EXEC_IMAGE} + command: ["triggerer"] + environment: + <<: *common-env-vars + AIRFLOW__OPENLINEAGE__DISABLED: "true" + ASTRO_AGENT_CLIENT_TRIGGERER__ASYNC_SLOTS: "1000" + ASTRO_AGENT_CLIENT_AGENT_ID: "${TRIGGERER_ID}" + restart: always