Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions src/cmd/cli/command/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ func makeComposeUpCmd() *cobra.Command {
term.Info("Tailing logs for", tailSource, "; press Ctrl+C to detach:")

tailOptions := newTailOptionsForDeploy(deploy.Etag, since, verbose)
serviceStates, err := cli.TailAndMonitor(ctx, project, provider, time.Duration(waitTimeout)*time.Second, tailOptions)
serviceStates, logCache, err := cli.TailAndMonitor(ctx, project, provider, time.Duration(waitTimeout)*time.Second, tailOptions)
if err != nil {
handleTailAndMonitorErr(ctx, err, client, cli.DebugConfig{
logs := logCache.Get()
handleTailAndMonitorErr(ctx, err, logs, client, cli.DebugConfig{
Deployment: deploy.Etag,
ModelId: modelId,
Project: project,
Provider: provider,
Since: since,
})

return err
}

Expand Down Expand Up @@ -224,7 +226,7 @@ func handleComposeUpErr(ctx context.Context, err error, project *compose.Project
return cli.InteractiveDebugForClientError(ctx, client, project, err)
}

func handleTailAndMonitorErr(ctx context.Context, err error, client *cliClient.GrpcClient, debugConfig cli.DebugConfig) {
func handleTailAndMonitorErr(ctx context.Context, err error, logs []string, client *cliClient.GrpcClient, debugConfig cli.DebugConfig) {
var errDeploymentFailed cliClient.ErrDeploymentFailed
if errors.As(err, &errDeploymentFailed) {
// Tail got canceled because of deployment failure: prompt to show the debugger
Expand All @@ -235,7 +237,13 @@ func handleTailAndMonitorErr(ctx context.Context, err error, client *cliClient.G
if nonInteractive {
printDefangHint("To debug the deployment, do:", debugConfig.String())
} else {
track.Evt("Debug Prompted", P("failedServices", debugConfig.FailedServices), P("etag", debugConfig.Deployment), P("reason", errDeploymentFailed))
props := track.MakeEventLogProperties("logs", logs)
props = append(props,
P("failedServices", debugConfig.FailedServices),
P("etag", debugConfig.Deployment),
P("reason", errDeploymentFailed),
)
track.Evt("Debug Prompted", props...)

// Call the AI debug endpoint using the original command context (not the tail ctx which is canceled)
if nil != cli.InteractiveDebugDeployment(ctx, client, debugConfig) {
Expand All @@ -248,7 +256,7 @@ func handleTailAndMonitorErr(ctx context.Context, err error, client *cliClient.G
}

func newTailOptionsForDeploy(deployment string, since time.Time, verbose bool) cli.TailOptions {
return cli.TailOptions{
tailOpt := cli.TailOptions{
Deployment: deployment,
LogType: logs.LogTypeAll,
// TODO: Move this to playground provider GetDeploymentStatus
Expand All @@ -266,6 +274,8 @@ func newTailOptionsForDeploy(deployment string, since time.Time, verbose bool) c
Since: since,
Verbose: verbose,
}

return tailOpt
}

func flushWarnings() {
Expand Down Expand Up @@ -616,7 +626,9 @@ func handleLogsCmd(cmd *cobra.Command, args []string) error {
Verbose: verbose,
Follow: follow,
}
return cli.Tail(cmd.Context(), provider, projectName, tailOptions)

_, err = cli.Tail(cmd.Context(), provider, projectName, tailOptions)
return err
}

func setupComposeCommand() *cobra.Command {
Expand Down
52 changes: 52 additions & 0 deletions src/pkg/circularbuffer/circularbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package circularbuffer

// BufferInterface abstracts the buffer operations
type BufferInterface[T any] interface {
Add(item T)
Get() []T
}

type CircularBuffer[T any] struct {
size int
entries int
index int
data []T
}

func (c *CircularBuffer[T]) Add(item T) {
c.entries++
c.data[c.index] = item
c.index = (c.index + 1) % c.size
}

func (c *CircularBuffer[T]) Get() []T {
maxItems := min(c.entries, c.size)
items := make([]T, maxItems)
startIdx := c.index

// the c.index points to the next write position (ie. oldest entry) if the buffer is full,
// otherwise if the buffer is not full then c.index does not point to the older entry so we
// need to start from index 0
if c.entries < c.size {
startIdx = 0
}

// Collect items in chronological order
for i := range maxItems {
idx := (startIdx + i) % c.size
items[i] = c.data[idx]
}
return items
}

func NewCircularBuffer[T any](bufferSize int) *CircularBuffer[T] {
if bufferSize <= 0 {
panic("failed to created a circular buffer: cannot have zero elements")
}
return &CircularBuffer[T]{
size: bufferSize,
entries: 0,
index: 0,
data: make([]T, bufferSize),
}
}
23 changes: 23 additions & 0 deletions src/pkg/circularbuffer/circularbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package circularbuffer

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCircularBuffer(t *testing.T) {
buffer := NewCircularBuffer[int](3)

assert.Equal(t, []int{}, buffer.Get())
buffer.Add(1)
assert.Equal(t, []int{1}, buffer.Get())
buffer.Add(2)
assert.Equal(t, []int{1, 2}, buffer.Get())
buffer.Add(3)
assert.Equal(t, []int{1, 2, 3}, buffer.Get())
buffer.Add(4)
assert.Equal(t, []int{2, 3, 4}, buffer.Get())
buffer.Add(5)
assert.Equal(t, []int{3, 4, 5}, buffer.Get())
}
2 changes: 1 addition & 1 deletion src/pkg/cli/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TailAndWaitForCD(ctx context.Context, provider client.Provider, projectName

// blocking call to tail
var tailErr error
if err := streamLogs(ctx, provider, projectName, tailOptions, logEntryPrintHandler); err != nil {
if _, err := streamLogs(ctx, provider, projectName, tailOptions, logEntryPrintHandler); err != nil {
term.Debug("Tail stopped with", err, errors.Unwrap(err))
if !errors.Is(err, context.Canceled) {
tailErr = err
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/composeUp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestComposeUpStops(t *testing.T) {
timer := time.AfterFunc(time.Second, func() { provider.subscribeStream.Send(tt.svcFailed, tt.subscribeErr) })
t.Cleanup(func() { timer.Stop() })
}
_, err = TailAndMonitor(ctx, project, provider, -1, TailOptions{Deployment: resp.Etag})
_, _, err = TailAndMonitor(ctx, project, provider, -1, TailOptions{Deployment: resp.Etag})
if err != nil {
if err.Error() != tt.wantError {
t.Errorf("expected error: %v, got: %v", tt.wantError, err)
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/estimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func GeneratePreview(ctx context.Context, project *compose.Project, client clien
Verbose: true,
}

err = streamLogs(ctx, previewProvider, project.Name, tailOptions, func(entry *defangv1.LogEntry, options *TailOptions, t *term.Term) error {
_, err = streamLogs(ctx, previewProvider, project.Name, tailOptions, func(entry *defangv1.LogEntry, options *TailOptions, t *term.Term) error {
if strings.HasPrefix(entry.Message, "Preview succeeded") {
return io.EOF
} else if strings.HasPrefix(entry.Message, "Preview failed") {
Expand Down
30 changes: 17 additions & 13 deletions src/pkg/cli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/DefangLabs/defang/src/pkg"
"github.com/DefangLabs/defang/src/pkg/circularbuffer"
"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/dryrun"
"github.com/DefangLabs/defang/src/pkg/logs"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (cerr CancelError) Unwrap() error {
return cerr.error
}

func Tail(ctx context.Context, provider client.Provider, projectName string, options TailOptions) error {
func Tail(ctx context.Context, provider client.Provider, projectName string, options TailOptions) (circularbuffer.BufferInterface[string], error) {
if options.LogType == logs.LogTypeUnspecified {
options.LogType = logs.LogTypeAll
}
Expand All @@ -167,7 +168,7 @@ func Tail(ctx context.Context, provider client.Provider, projectName string, opt
}

if dryrun.DoDryRun {
return dryrun.ErrDryRun
return nil, dryrun.ErrDryRun
}

return streamLogs(ctx, provider, projectName, options, logEntryPrintHandler)
Expand Down Expand Up @@ -208,7 +209,7 @@ type LogEntryHandler func(*defangv1.LogEntry, *TailOptions, *term.Term) error

const DefaultTailLimit = 100

func streamLogs(ctx context.Context, provider client.Provider, projectName string, options TailOptions, handler LogEntryHandler) error {
func streamLogs(ctx context.Context, provider client.Provider, projectName string, options TailOptions, handler LogEntryHandler) (circularbuffer.BufferInterface[string], error) {
var sinceTs, untilTs *timestamppb.Timestamp
if pkg.IsValidTime(options.Since) {
sinceTs = timestamppb.New(options.Since)
Expand Down Expand Up @@ -247,7 +248,7 @@ func streamLogs(ctx context.Context, provider client.Provider, projectName strin

serverStream, err := provider.QueryLogs(ctx, tailRequest)
if err != nil {
return err
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -313,16 +314,17 @@ func streamLogs(ctx context.Context, provider client.Provider, projectName strin
return receiveLogs(ctx, provider, projectName, tailRequest, serverStream, &options, doSpinner, handler)
}

func receiveLogs(ctx context.Context, provider client.Provider, projectName string, tailRequest *defangv1.TailRequest, serverStream client.ServerStream[defangv1.TailResponse], options *TailOptions, doSpinner bool, handler LogEntryHandler) error {
func receiveLogs(ctx context.Context, provider client.Provider, projectName string, tailRequest *defangv1.TailRequest, serverStream client.ServerStream[defangv1.TailResponse], options *TailOptions, doSpinner bool, handler LogEntryHandler) (circularbuffer.BufferInterface[string], error) {
logCache := circularbuffer.NewCircularBuffer[string](30)
skipDuplicate := false
var err error
for {
if !serverStream.Receive() {
if errors.Is(serverStream.Err(), context.Canceled) || errors.Is(serverStream.Err(), context.DeadlineExceeded) {
return &CancelError{TailOptions: *options, error: serverStream.Err(), ProjectName: projectName}
return logCache, &CancelError{TailOptions: *options, error: serverStream.Err(), ProjectName: projectName}
}
if errors.Is(serverStream.Err(), io.EOF) {
return serverStream.Err()
return nil, serverStream.Err()
}

// Reconnect on Error: internal: stream error: stream ID 5; INTERNAL_ERROR; received from peer
Expand All @@ -333,13 +335,13 @@ func receiveLogs(ctx context.Context, provider client.Provider, projectName stri
spaces, _ = term.Warnf("Reconnecting...\r") // overwritten below
}
if err := provider.DelayBeforeRetry(ctx); err != nil {
return err
return logCache, err
}
tailRequest.Since = timestamppb.New(options.Since)
serverStream, err = provider.QueryLogs(ctx, tailRequest)
if err != nil {
term.Debug("Reconnect failed:", err)
return err
return logCache, err
}
if !options.Raw {
term.Printf("%*s", spaces, "\r") // clear the "reconnecting" message
Expand All @@ -348,21 +350,21 @@ func receiveLogs(ctx context.Context, provider client.Provider, projectName stri
continue
}

return serverStream.Err() // returns nil on EOF
return logCache, serverStream.Err() // returns nil on EOF
}
msg := serverStream.Msg()

if msg == nil {
continue
}

if err = handleLogEntryMsgs(msg, doSpinner, skipDuplicate, options, handler); err != nil {
return err
if err = handleLogEntryMsgs(msg, logCache, doSpinner, skipDuplicate, options, handler); err != nil {
return nil, err
}
}
}

func handleLogEntryMsgs(msg *defangv1.TailResponse, doSpinner bool, skipDuplicate bool, options *TailOptions, handler LogEntryHandler) error {
func handleLogEntryMsgs(msg *defangv1.TailResponse, logCache circularbuffer.BufferInterface[string], doSpinner bool, skipDuplicate bool, options *TailOptions, handler LogEntryHandler) error {
for _, e := range msg.Entries {
// Replace service progress messages with our own spinner
if doSpinner && isProgressDot(e.Message) {
Expand All @@ -382,6 +384,8 @@ func handleLogEntryMsgs(msg *defangv1.TailResponse, doSpinner bool, skipDuplicat
options.Since = ts
}

logCache.Add(e.Message)

err := handler(e, options, term.DefaultTerm)
if err != nil {
term.Debug("Ending tail loop", err)
Expand Down
10 changes: 6 additions & 4 deletions src/pkg/cli/tailAndMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/DefangLabs/defang/src/pkg"
"github.com/DefangLabs/defang/src/pkg/circularbuffer"
"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/cli/compose"
"github.com/DefangLabs/defang/src/pkg/term"
Expand All @@ -17,7 +18,7 @@ import (

const targetServiceState = defangv1.ServiceState_DEPLOYMENT_COMPLETED

func TailAndMonitor(ctx context.Context, project *compose.Project, provider client.Provider, waitTimeout time.Duration, tailOptions TailOptions) (ServiceStates, error) {
func TailAndMonitor(ctx context.Context, project *compose.Project, provider client.Provider, waitTimeout time.Duration, tailOptions TailOptions) (ServiceStates, circularbuffer.BufferInterface[string], error) {
tailOptions.Follow = true
if tailOptions.Deployment == "" {
panic("tailOptions.Deployment must be a valid deployment ID")
Expand Down Expand Up @@ -67,8 +68,9 @@ func TailAndMonitor(ctx context.Context, project *compose.Project, provider clie
}()

// blocking call to tail
var tailErr error
if err := Tail(tailCtx, provider, project.Name, tailOptions); err != nil {
var err, tailErr error
var logCache circularbuffer.BufferInterface[string]
if logCache, err = Tail(tailCtx, provider, project.Name, tailOptions); err != nil {
term.Debug("Tail stopped with", err, errors.Unwrap(err))

if connect.CodeOf(err) == connect.CodePermissionDenied {
Expand Down Expand Up @@ -99,7 +101,7 @@ func TailAndMonitor(ctx context.Context, project *compose.Project, provider clie
}
}

return serviceStates, errors.Join(cdErr, svcErr, tailErr)
return serviceStates, logCache, errors.Join(cdErr, svcErr, tailErr)
}

func CanMonitorService(service compose.ServiceConfig) bool {
Expand Down
10 changes: 5 additions & 5 deletions src/pkg/cli/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestTail(t *testing.T) {
},
}

err := Tail(t.Context(), p, projectName, TailOptions{Verbose: true}) // Output host
_, err := Tail(t.Context(), p, projectName, TailOptions{Verbose: true}) // Output host
if err != io.EOF {
t.Errorf("Tail() error = %v, want io.EOF", err)
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestUTC(t *testing.T) {
localMock = localMock.MockTimestamp(localTime)

// Start the terminal for local time test
err := Tail(t.Context(), localMock, projectName, TailOptions{Verbose: true}) // Output host
_, err := Tail(t.Context(), localMock, projectName, TailOptions{Verbose: true}) // Output host
if err != nil {
t.Errorf("Tail() error = %v, want io.EOF", err)
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestUTC(t *testing.T) {
utcMock := &mockTailProvider{}
utcMock = utcMock.MockTimestamp(utcTime)

err = Tail(t.Context(), utcMock, projectName, TailOptions{Verbose: true})
_, err = Tail(t.Context(), utcMock, projectName, TailOptions{Verbose: true})
if err != nil {
t.Errorf("Tail() error = %v, want io.EOF", err)
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestTailError(t *testing.T) {
mock := &mockQueryErrorProvider{
TailStreamError: tt.err,
}
err := Tail(t.Context(), mock, "project", tailOptions)
_, err := Tail(t.Context(), mock, "project", tailOptions)
if err != nil {
if err.Error() != tt.wantError {
t.Errorf("Tail() error = %q, want: %q", err.Error(), tt.wantError)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestTailContext(t *testing.T) {
time.AfterFunc(10*time.Millisecond, func() {
mock.tailStream.Send(nil, tt.cause)
})
err := Tail(ctx, mock, "project", tailOptions)
_, err := Tail(ctx, mock, "project", tailOptions)
if err.Error() != tt.wantError {
t.Errorf("Tail() error = %q, want: %q", err.Error(), tt.wantError)
}
Expand Down
3 changes: 2 additions & 1 deletion src/pkg/mcp/tools/default_tool_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strconv"

"github.com/DefangLabs/defang/src/pkg/circularbuffer"
"github.com/DefangLabs/defang/src/pkg/cli"
cliClient "github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/cli/compose"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (DefaultToolCLI) ComposeUp(ctx context.Context, project *compose.Project, c
return cli.ComposeUp(ctx, project, client, provider, uploadMode, mode)
}

func (DefaultToolCLI) Tail(ctx context.Context, provider cliClient.Provider, project *compose.Project, options cli.TailOptions) error {
func (DefaultToolCLI) Tail(ctx context.Context, provider cliClient.Provider, project *compose.Project, options cli.TailOptions) (circularbuffer.BufferInterface[string], error) {
return cli.Tail(ctx, provider, project.Name, options)
}

Expand Down
Loading
Loading