Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions runtime/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)

// InspectContainer inspects the pipeline container.
Expand Down Expand Up @@ -74,8 +75,43 @@ func (c *client) RemoveContainer(ctx context.Context, ctn *pipeline.Container) e
// RunContainer creates and starts the pipeline container.
func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *pipeline.Build) error {
c.Logger.Tracef("running container %s", ctn.ID)

field := fields.Set{
"involvedObject.name": fields.EscapeValue(c.Pod.ObjectMeta.Name),
"involvedObject.namespace": fields.EscapeValue(c.config.Namespace),
"involvedObject.fieldPath": fmt.Sprintf("spec.container{%s}", fields.EscapeValue(ctn.ID)),
}
fieldSelector := field.AsSelector()

// create options for watching the container events
opts := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
Watch: true,
}

// Call Kubernetes API to watch for Image Pull Errors
// nolint: contextcheck // ignore non-inherited new context
eventWatch, err := c.Kubernetes.CoreV1().Events(c.config.Namespace).Watch(context.Background(), opts)
if err != nil {
return fmt.Errorf(
"unable to watch events for pod %s and container %s",
c.Pod.ObjectMeta.Name,
ctn.ID,
)
}

defer eventWatch.Stop()

// validate the container image
err = c.CreateImage(ctx, ctn)
if err != nil {
return err
}

var _image string

// parse image from step
_image, err := image.ParseWithError(ctn.Image)
_image, err = image.ParseWithError(ctn.Image)
if err != nil {
return err
}
Expand All @@ -99,7 +135,49 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p
return err
}

return nil
for {
// capture new result from the channel
//
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface
result := <-eventWatch.ResultChan()

// events get deleted after some time so ignore them
if result.Type == watch.Deleted {
continue
}

// convert the object from the result to a pod
event := result.Object.(*v1.Event)

// check if the event mentions the target image
if !(strings.Contains(event.Message, ctn.Image) || strings.Contains(event.Message, _image)) {
// if the relevant messages does not include our image
// it is probably for "kubernetes/pause:latest"
// or it is a generic message that is basically useless like:
// event.Reason => event.Message
// Failed => Error: ErrImagePull
// BackOff => Error: ImagePullBackOff
continue
}

// TODO: probably need a timeout of some kind...

switch event.Reason {
// examples: event.Reason => event.Message
case "Failed", "BackOff":
// Failed => Failed to pull image "image:tag": <containerd message>
// BackOff => Back-off pulling image "image:tag"
return fmt.Errorf("failed to run container %s in %s: %s", ctn.ID, c.Pod.ObjectMeta.Name, event.Message)
case "Pulled":
// Pulled => Successfully pulled image "image:tag" in <time>
return nil
case "Pulling":
// Pulling => Pulling image "image:tag"
break
default:
break
}
}
}

// SetupContainer prepares the image for the pipeline container.
Expand Down Expand Up @@ -135,6 +213,11 @@ func (c *client) SetupContainer(ctx context.Context, ctn *pipeline.Container) er
case constants.PullAlways:
// set the pod container pull policy to always
container.ImagePullPolicy = v1.PullAlways
// validate ctn.Image
err := c.CreateImage(ctx, ctn)
if err != nil {
return err
}
case constants.PullNever:
// set the pod container pull policy to never
container.ImagePullPolicy = v1.PullNever
Expand All @@ -150,6 +233,11 @@ func (c *client) SetupContainer(ctx context.Context, ctn *pipeline.Container) er
default:
// default the pod container pull policy to if not present
container.ImagePullPolicy = v1.PullIfNotPresent
// validate ctn.Image
err := c.CreateImage(ctx, ctn)
if err != nil {
return err
}
}

// fill in the VolumeMounts including workspaceMount
Expand Down
19 changes: 18 additions & 1 deletion runtime/kubernetes/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,45 @@ func TestKubernetes_RunContainer(t *testing.T) {
pipeline *pipeline.Build
pod *v1.Pod
volumes []string
events []*v1.Event
}{
{
failure: false,
container: _container,
pipeline: _stages,
pod: _pod,
events: _podEvents,
},
{
failure: false,
container: _container,
pipeline: _steps,
pod: _pod,
events: _podEvents,
},
{
failure: true,
container: _container,
pipeline: _steps,
pod: _pod,
events: _ctnFailureEvents,
},
}

// run tests
for _, test := range tests {
_engine, err := NewMock(test.pod)
_engine, _watch, err := newMockWithWatch(test.pod, "events")
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

go func() {
// simulate adding events to the watcher
for _, event := range test.events {
_watch.Add(event.DeepCopyObject())
}
}()

if len(test.volumes) > 0 {
_engine.config.Volumes = test.volumes
}
Expand Down
16 changes: 15 additions & 1 deletion runtime/kubernetes/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/go-vela/types/constants"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/internal/image"
)

const imagePatch = `
Expand All @@ -29,8 +30,21 @@ const imagePatch = `

// CreateImage creates the pipeline container image.
func (c *client) CreateImage(ctx context.Context, ctn *pipeline.Container) error {
c.Logger.Tracef("no-op: creating image for container %s", ctn.ID)
c.Logger.Tracef("creating image for container %s", ctn.ID)

// parse/validate image from container
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/image#ParseWithError
_, err := image.ParseWithError(ctn.Image)
if err != nil {
return err
}

// Kubernetes does not have an API to make sure it can access an image,
// so we have to query the appropriate docker registry ourselves.
// TODO: query docker registry for the image (if possible)
// this might require retrieving the pullSecrets from k8s
// or have the admin add a Vela accessible secret as well.
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions runtime/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kubernetes

import (
"fmt"
"testing"

"github.com/go-vela/types/pipeline"
Expand All @@ -14,6 +15,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/clientcmd/api/latest"
"k8s.io/client-go/tools/reference"
)

func TestKubernetes_New(t *testing.T) {
Expand Down Expand Up @@ -152,6 +155,72 @@ var (
},
},
}
_podRef, _ = reference.GetReference(latest.Scheme, _pod)
_podEvents = []*v1.Event{
{
InvolvedObject: *_podRef,
Reason: "Pulled",
Message: "Successfully pulled image \"postgres:12-alpine\" in 1.123456789s",
Type: "Normal",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "postgres", Namespace: "test"},
},
{
InvolvedObject: *_podRef,
Reason: "Pulled",
Message: "Successfully pulled image \"target/vela-git:v0.4.0\" in 1.123456789s",
Type: "Normal",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "vela-git", Namespace: "test"},
},
{
InvolvedObject: *_podRef,
Reason: "Pulled",
Message: "Successfully pulled image \"alpine:latest\" in 1.123456789s",
Type: "Normal",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "alpine", Namespace: "test"},
},
}
_ctnFailureEvents = []*v1.Event{
{
InvolvedObject: *_podRef,
Reason: "Failed",
Message: fmt.Sprintf(
"Failed to pull image \"%s\": rpc error: "+
"code = NotFound desc = failed to pull and unpack image \"%s\": "+
"failed to resolve reference \"%s\": %s: not found",
_container.Image, _container.Image, _container.Image, _container.Image,
),
Type: "Warning",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "ctn-event-1234", Namespace: "test"},
},
{
InvolvedObject: *_podRef,
Reason: "Failed",
Message: "Error: ErrImagePull",
Type: "Warning",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "ctn-event-1235", Namespace: "test"},
},
{
InvolvedObject: *_podRef,
Reason: "BackOff",
Message: fmt.Sprintf("Back-off pulling image \"%s\"", _container.Image),
Type: "Normal",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "ctn-event-1236", Namespace: "test"},
},
{
InvolvedObject: *_podRef,
Reason: "Failed",
Message: "Error: ImagePullBackOff",
Type: "Warning",
Count: 1,
ObjectMeta: metav1.ObjectMeta{Name: "ctn-event-1237", Namespace: "test"},
},
}

_stages = &pipeline.Build{
Version: "1",
Expand Down