Skip to content

Commit 96e3cbf

Browse files
authored
gosdk: adds scheduler ABI support & example with delay filter (#36)
1 parent fb646a6 commit 96e3cbf

File tree

8 files changed

+226
-1
lines changed

8 files changed

+226
-1
lines changed

go/delay.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"time"
7+
8+
"github.com/envoyproxy/dynamic-modules-examples/go/gosdk"
9+
)
10+
11+
type (
12+
// delayFilterConfig implements [gosdk.HttpFilterConfig].
13+
delayFilterConfig struct{}
14+
// delayFilter implements [gosdk.HttpFilter].
15+
//
16+
// This filter demostrates how to use the scheduler to delay the request processing,
17+
// and how to use goroutines to perform the asynchronous operations.
18+
delayFilter struct {
19+
onRequestHeaders time.Time
20+
delayLapsed time.Duration
21+
}
22+
)
23+
24+
// Destroy implements [gosdk.HttpFilterConfig].
25+
func (p delayFilterConfig) Destroy() {}
26+
27+
// NewFilter implements [gosdk.HttpFilterConfig].
28+
func (p delayFilterConfig) NewFilter() gosdk.HttpFilter { return &delayFilter{} }
29+
30+
// Destroy implements [gosdk.HttpFilter].
31+
func (p *delayFilter) Destroy() {}
32+
33+
// RequestHeaders implements [gosdk.HttpFilter].
34+
func (p *delayFilter) RequestHeaders(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.RequestHeadersStatus {
35+
// Check if the headers contain the "do-delay" header to trigger the delay.
36+
if _, ok := e.GetRequestHeader("do-delay"); !ok {
37+
// If the header is not present, continue the request processing.
38+
fmt.Println("gosdk: RequestHeaders, do-delay header not found, continuing request processing")
39+
return gosdk.RequestHeadersStatusContinue
40+
}
41+
42+
schduler := e.NewScheduler()
43+
now := time.Now()
44+
p.onRequestHeaders = now
45+
go func() {
46+
// Simulate some delay.
47+
time.Sleep(2 * time.Second)
48+
// Commit the event to continue the request processing.
49+
schduler.Commit(0)
50+
}()
51+
fmt.Printf("gosdk: RequestHeaders, delaying request processing for 2 seconds at %s\n", now)
52+
return gosdk.RequestHeadersStatusStopIteration
53+
}
54+
55+
// Sheduled implements gosdk.HttpFilter.
56+
func (p *delayFilter) Sheduled(e gosdk.EnvoyHttpFilter, eventID uint64) {
57+
if eventID != 0 {
58+
panic("unexpected eventID in Sheduled: " + strconv.Itoa(int(eventID)))
59+
}
60+
fmt.Println("gosdk: Sheduled, continuing request processing after delay")
61+
p.delayLapsed = time.Since(p.onRequestHeaders)
62+
// We can insert some headers at this phase.
63+
e.SetRequestHeader("delay-filter-on-scheduled", []byte("yes"))
64+
// Then continue the request processing.
65+
e.ContinueRequest()
66+
}
67+
68+
// RequestBody implements [gosdk.HttpFilter].
69+
func (p *delayFilter) RequestBody(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.RequestBodyStatus {
70+
return gosdk.RequestBodyStatusContinue
71+
}
72+
73+
// ResponseHeaders implements [gosdk.HttpFilter].
74+
func (p *delayFilter) ResponseHeaders(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.ResponseHeadersStatus {
75+
// Add a response header to indicate the delay.
76+
if p.delayLapsed > 0 {
77+
e.SetResponseHeader("x-delay-filter-lapsed", []byte(p.delayLapsed.String()))
78+
}
79+
return gosdk.ResponseHeadersStatusContinue
80+
}
81+
82+
// ResponseBody implements [gosdk.HttpFilter].
83+
func (p *delayFilter) ResponseBody(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.ResponseBodyStatus {
84+
return gosdk.ResponseBodyStatusContinue
85+
}

go/gosdk/abi.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,31 @@ bool envoy_dynamic_module_callback_http_filter_get_attribute_string(
127127
uintptr_t filter_envoy_ptr,
128128
size_t attribute_id,
129129
uintptr_t* result, size_t* result_length);
130+
131+
#cgo noescape envoy_dynamic_module_callback_http_filter_continue_decoding
132+
#cgo nocallback envoy_dynamic_module_callback_http_filter_continue_decoding
133+
void envoy_dynamic_module_callback_http_filter_continue_decoding(
134+
uintptr_t filter_envoy_ptr);
135+
136+
#cgo noescape envoy_dynamic_module_callback_http_filter_continue_encoding
137+
#cgo nocallback envoy_dynamic_module_callback_http_filter_continue_encoding
138+
void envoy_dynamic_module_callback_http_filter_continue_encoding(
139+
uintptr_t filter_envoy_ptr);
140+
141+
#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_new
142+
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_new
143+
uintptr_t envoy_dynamic_module_callback_http_filter_scheduler_new(
144+
uintptr_t filter_envoy_ptr);
145+
146+
#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_delete
147+
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_delete
148+
void envoy_dynamic_module_callback_http_filter_scheduler_delete(
149+
uintptr_t scheduler_ptr);
150+
151+
#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_commit
152+
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_commit
153+
void envoy_dynamic_module_callback_http_filter_scheduler_commit(
154+
uintptr_t scheduler_ptr, uint64_t event_id);
130155
*/
131156
import "C"
132157

@@ -274,7 +299,9 @@ func envoy_dynamic_module_on_http_filter_scheduled(
274299
filterEnvoyPtr uintptr,
275300
filterModulePtr uintptr,
276301
eventID C.uint64_t) {
277-
panic("TODO")
302+
pinned := unwrapPinnedHttpFilter(uintptr(filterModulePtr))
303+
// Call the Scheduled method of the filter.
304+
pinned.obj.Sheduled(envoyFilter{raw: uintptr(filterEnvoyPtr)}, uint64(eventID))
278305
}
279306

280307
// GetRequestHeader implements [EnvoyHttpFilter].
@@ -396,6 +423,40 @@ type envoySlice struct {
396423
// envoyFilter implements [EnvoyHttpFilter].
397424
type envoyFilter struct{ raw uintptr }
398425

426+
// ContinueRequest implements EnvoyHttpFilter.
427+
func (e envoyFilter) ContinueRequest() {
428+
C.envoy_dynamic_module_callback_http_filter_continue_decoding(C.uintptr_t(e.raw))
429+
}
430+
431+
// ContinueResponse implements EnvoyHttpFilter.
432+
func (e envoyFilter) ContinueResponse() {
433+
C.envoy_dynamic_module_callback_http_filter_continue_encoding(C.uintptr_t(e.raw))
434+
}
435+
436+
// NewScheduler implements EnvoyHttpFilter.
437+
func (e envoyFilter) NewScheduler() Scheduler {
438+
// Create a new scheduler for the filter.
439+
schedulerPtr := C.envoy_dynamic_module_callback_http_filter_scheduler_new(C.uintptr_t(e.raw))
440+
if schedulerPtr == 0 {
441+
return nil
442+
}
443+
return &envoyFilterScheduler{raw: uintptr(schedulerPtr)}
444+
}
445+
446+
type envoyFilterScheduler struct {
447+
raw uintptr
448+
}
449+
450+
// Close implements Scheduler.
451+
func (e *envoyFilterScheduler) Close() {
452+
C.envoy_dynamic_module_callback_http_filter_scheduler_delete(C.uintptr_t(e.raw))
453+
}
454+
455+
// Commit implements Scheduler.
456+
func (e *envoyFilterScheduler) Commit(eventID uint64) {
457+
C.envoy_dynamic_module_callback_http_filter_scheduler_commit(C.uintptr_t(e.raw), C.uint64_t(eventID))
458+
}
459+
399460
// GetRequestProtocol implements [EnvoyHttpFilter].
400461
func (e envoyFilter) GetRequestProtocol() string {
401462
// https://github.com/envoyproxy/envoy/blob/05223ee2cd143d70b32402783c2a866a9dd18bd1/source/extensions/dynamic_modules/abi.h#L237-L372

go/gosdk/gosdk.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type HttpFilterConfig interface {
2727
// This is passed to each event hook of the HttpFilter.
2828
//
2929
// **WARNING**: This must not outlive each event hook since there's no guarantee that the EnvoyHttpFilter will be valid after the event hook is returned.
30+
// To perform the asynchronous operations, use [EnvoyHttpFilter.NewScheduler] to create a [Scheduler] and perform the operations in a separate Goroutine.
31+
// Then, use the [Scheduler.Commit] method to commit the event to the Envoy filter on the correct worker thread to continue processing the request.
3032
type EnvoyHttpFilter interface {
3133
// GetRequestHeader gets the first value of the request header. Returns the value and true if the header is found.
3234
GetRequestHeader(key string) (string, bool)
@@ -59,6 +61,26 @@ type EnvoyHttpFilter interface {
5961
GetSourceAddress() string
6062
// GetRequestProtocol gets the request protocol. This corresponds to `request.protocol` attribute https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes.
6163
GetRequestProtocol() string
64+
// NewScheduler creates a new Scheduler that can be used to schedule events to the correct Envoy worker thread.
65+
// Created schedulers must be closed when they are no longer needed.
66+
//
67+
// Returns nil if this is called from any other than normal event hooks such as RequestHeaders, RequestBody, ResponseHeaders, and ResponseBody.
68+
NewScheduler() Scheduler
69+
// ContinueRequest continues the request processing after the Stop variants are returned from the normal event hooks such as RequestHeaders, RequestBody, ResponseHeaders, and ResponseBody.
70+
// Mainly this is intented to be used during the HttpFilter.Scheduled method being called.
71+
ContinueRequest()
72+
// ContinueResponse is the same as ContinueRequest but for the response processing.
73+
ContinueResponse()
74+
}
75+
76+
// Scheduler is an interface that can be used to schedule a generic event to the correct Envoy worker thread.
77+
//
78+
// This is created via [EnvoyHttpFilter.NewScheduler] and can be passed across Goroutines.
79+
type Scheduler interface {
80+
// Commit commits the event to the Envoy filter on the correct worker thread.
81+
// The eventID is a unique identifier for the event, and it can be used to distinguish between different events.
82+
Commit(eventID uint64)
83+
Close()
6284
}
6385

6486
// HttpFilter is an interface that represents each Http request.
@@ -77,6 +99,9 @@ type HttpFilter interface {
7799
ResponseBody(e EnvoyHttpFilter, endOfStream bool) ResponseBodyStatus
78100
// TODO: add ResponseTrailers support.
79101

102+
// Scheuled is called when the filter is scheduled to run.
103+
Sheduled(e EnvoyHttpFilter, eventID uint64)
104+
80105
// Destroy is called when the stream is destroyed.
81106
Destroy()
82107
}

go/header_auth.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ func (p headerAuthFilterConfig) NewFilter() gosdk.HttpFilter {
2828
return &headerAuthFilter{authHeaderName: p.authHeaderName}
2929
}
3030

31+
// Sheduled implements gosdk.HttpFilter.
32+
func (p headerAuthFilter) Sheduled(gosdk.EnvoyHttpFilter, uint64) {}
33+
3134
// Destroy implements [gosdk.HttpFilter].
3235
func (p *headerAuthFilter) Destroy() {}
3336

go/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ func newHttpFilterConfig(name string, config []byte) gosdk.HttpFilterConfig {
1818
return passthroughFilterConfig{}
1919
case "header_auth":
2020
return headerAuthFilterConfig{authHeaderName: string(config)}
21+
case "delay":
22+
return delayFilterConfig{}
2123
default:
2224
panic("unknown filter: " + name)
2325
}

go/passthrough.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ func (p passthroughFilterConfig) Destroy() {}
2020
// NewFilter implements [gosdk.HttpFilterConfig].
2121
func (p passthroughFilterConfig) NewFilter() gosdk.HttpFilter { return passthroughFilter{} }
2222

23+
// Sheduled implements gosdk.HttpFilter.
24+
func (p passthroughFilter) Sheduled(gosdk.EnvoyHttpFilter, uint64) {}
25+
2326
// Destroy implements [gosdk.HttpFilter].
2427
func (p passthroughFilter) Destroy() {}
2528

integration/envoy.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ static_resources:
3636
dynamic_module_config:
3737
name: rust_module
3838
filter_name: passthrough
39+
- name: dynamic_modules/conditional_deply
40+
typed_config:
41+
# https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/dynamic_modules/v3/dynamic_modules.proto#envoy-v3-api-msg-extensions-dynamic-modules-v3-dynamicmoduleconfig
42+
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_modules.v3.DynamicModuleFilter
43+
dynamic_module_config:
44+
name: go_module
45+
do_not_close: true
46+
filter_name: delay
3947
- name: dynamic_modules/access_logger
4048
typed_config:
4149
# https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/dynamic_modules/v3/dynamic_modules.proto#envoy-v3-api-msg-extensions-dynamic-modules-v3-dynamicmoduleconfig

integration/main_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,44 @@ func TestIntegration(t *testing.T) {
126126
}, 30*time.Second, 1*time.Second)
127127
})
128128

129+
t.Run("delay", func(t *testing.T) {
130+
require.Eventually(t, func() bool {
131+
req, err := http.NewRequest("GET", "http://localhost:1062/headers", nil)
132+
require.NoError(t, err)
133+
req.Header.Set("do-delay", "true")
134+
135+
resp, err := http.DefaultClient.Do(req)
136+
if err != nil {
137+
t.Logf("Envoy not ready yet: %v", err)
138+
return false
139+
}
140+
defer func() {
141+
require.NoError(t, resp.Body.Close())
142+
}()
143+
body, err := io.ReadAll(resp.Body)
144+
if err != nil {
145+
t.Logf("Envoy not ready yet: %v", err)
146+
return false
147+
}
148+
149+
t.Logf("response: headers=%v, body=%s", resp.Header, string(body))
150+
require.Equal(t, 200, resp.StatusCode)
151+
152+
// Check the request header "delay-filter-on-scheduled: yes" added in the Sheduled phase.
153+
type httpBinHeadersBody struct {
154+
Headers map[string][]string `json:"headers"`
155+
}
156+
var headersBody httpBinHeadersBody
157+
require.NoError(t, json.Unmarshal(body, &headersBody))
158+
require.Contains(t, headersBody.Headers["Delay-Filter-On-Scheduled"], "yes")
159+
160+
// We also need to check that the response headers were added.
161+
require.NotEmpty(t, resp.Header.Get("x-delay-filter-lapsed"), "x-delay-filter-lapsed header should be set")
162+
require.Regexp(t, `^2\.\d+s$`, resp.Header.Get("x-delay-filter-lapsed"), "x-delay-filter-lapsed header should be around 2s")
163+
return true
164+
}, 30*time.Second, 200*time.Millisecond)
165+
})
166+
129167
t.Run("http_header_mutation", func(t *testing.T) {
130168
require.Eventually(t, func() bool {
131169
req, err := http.NewRequest("GET", "http://localhost:1062/headers", nil)

0 commit comments

Comments
 (0)