@@ -82,6 +82,7 @@ type RequestContext struct {
82
82
ResponseComplete bool
83
83
ResponseStatusCode string
84
84
RequestRunning bool
85
+ Request * Request
85
86
86
87
RequestState StreamRequestState
87
88
modelServerStreaming bool
@@ -95,6 +96,10 @@ type RequestContext struct {
95
96
respTrailerResp * extProcPb.ProcessingResponse
96
97
}
97
98
99
+ type Request struct {
100
+ Headers map [string ]string
101
+ Body map [string ]interface {}
102
+ }
98
103
type StreamRequestState int
99
104
100
105
const (
@@ -118,10 +123,14 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
118
123
// See https://github.com/envoyproxy/envoy/issues/17540.
119
124
reqCtx := & RequestContext {
120
125
RequestState : RequestReceived ,
126
+ Request : & Request {
127
+ Headers : make (map [string ]string ),
128
+ Body : make (map [string ]interface {}),
129
+ },
121
130
}
122
131
123
132
var body []byte
124
- var requestBody , responseBody map [string ]interface {}
133
+ var responseBody map [string ]interface {}
125
134
126
135
// Create error handling var as each request should only report once for
127
136
// error metrics. This doesn't cover the error "Cannot receive stream request" because
@@ -167,15 +176,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
167
176
// Message is buffered, we can read and decode.
168
177
if v .RequestBody .EndOfStream {
169
178
loggerTrace .Info ("decoding" )
170
- err = json .Unmarshal (body , & requestBody )
179
+ err = json .Unmarshal (body , & reqCtx . Request . Body )
171
180
if err != nil {
172
181
logger .V (logutil .DEFAULT ).Error (err , "Error unmarshaling request body" )
182
+ // TODO: short circuit and send the body back as is (this could be an envoy error), currently we drop
183
+ // whatever the body request would have been and send our immediate response instead.
173
184
}
174
185
175
186
// Body stream complete. Allocate empty slice for response to use.
176
187
body = []byte {}
177
188
178
- reqCtx , err = s .HandleRequestBody (ctx , reqCtx , req , requestBody )
189
+ reqCtx , err = s .HandleRequestBody (ctx , reqCtx )
179
190
if err != nil {
180
191
logger .V (logutil .DEFAULT ).Error (err , "Error handling body" )
181
192
} else {
@@ -256,7 +267,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
256
267
loggerTrace .Info ("stream completed" )
257
268
// Don't send a 500 on a response error. Just let the message passthrough and log our error for debugging purposes.
258
269
// We assume the body is valid JSON, err messages are not guaranteed to be json, and so capturing and sending a 500 obfuscates the response message.
259
- // using the standard 'err' var will send an immediate error response back to the caller.
270
+ // Using the standard 'err' var will send an immediate error response back to the caller.
260
271
var responseErr error
261
272
responseErr = json .Unmarshal (body , & responseBody )
262
273
if responseErr != nil {
0 commit comments