1
1
package main
2
2
3
3
import (
4
+ "flag"
4
5
"fmt"
5
6
"bytes"
6
7
"crypto/tls"
7
8
"io/ioutil"
8
- "log"
9
9
"net"
10
10
"net/http"
11
11
"os"
@@ -22,6 +22,7 @@ import (
22
22
"github.com/prometheus/client_golang/prometheus/promhttp"
23
23
24
24
"github.com/jessevdk/go-flags"
25
+ "k8s.io/klog/v2"
25
26
)
26
27
27
28
type Opts struct {
@@ -30,6 +31,7 @@ type Opts struct {
30
31
ListenAddress string `long:"listen" short:"l" env:"LISTEN_ADDRESS" description:"listen bind address" default:":8080"`
31
32
MetricsAddress string `long:"metrics_address" short:"m" env:"METRICS_ADDRESS" description:"metrics bind address" default:":9091"`
32
33
ConfigPath string `long:"config_path" short:"c" env:"CONFIG_PATH" description:"path to backend map file in yaml format" default:"/etc/backends.yaml"`
34
+ LogLevel int `long:"loglevel" short:"v" env:"LOG_LEVEL" description:"log verbosity level for klog" default:"2"`
33
35
}
34
36
35
37
// BackendConfig represents the configuration for each backend
@@ -134,17 +136,17 @@ func NewProxyServer(configPath string, queueSize, workerCount int) *ProxyServer
134
136
for sig := range signalCh {
135
137
switch sig {
136
138
case os .Interrupt :
137
- log . Printf ("Got interrput signal" )
139
+ klog . V ( 2 ). Infof ("Got interrput signal" )
138
140
case syscall .SIGTERM :
139
- log . Printf ("Got SIGTERM signal" )
141
+ klog . V ( 2 ). Infof ("Got SIGTERM signal" )
140
142
// deliver rest of queue to FINAL destination
141
143
case syscall .SIGQUIT :
142
- log . Printf ("Got SIGQUIT signal" )
144
+ klog . V ( 2 ). Infof ("Got SIGQUIT signal" )
143
145
// deliver rest of queue to FINAL destination
144
146
default :
145
- log . Printf ("Some signal received: %v\n " , sig )
147
+ klog . V ( 2 ). Infof ("Some signal received: %v\n " , sig )
146
148
}
147
- log . Printf ("Queue len is: %v\n " , ps .getQueueLen ())
149
+ klog . V ( 1 ). Infof ("Queue len is: %v\n " , ps .getQueueLen ())
148
150
}
149
151
}()
150
152
@@ -160,12 +162,12 @@ func (p *ProxyServer) getQueueLen() int {
160
162
func (p * ProxyServer ) loadConfig () error {
161
163
data , err := ioutil .ReadFile (p .configPath )
162
164
if err != nil {
163
- log .Fatal ("Error reading config file: %v\n " , err )
165
+ klog .Fatal ("Error reading config file: %v\n " , err )
164
166
}
165
167
166
168
var newConfig Config
167
169
if err := yaml .Unmarshal (data , & newConfig ); err != nil {
168
- log . Printf ("Error parsing config file: %v\n " , err )
170
+ klog . V ( 1 ). Infof ("Error parsing config file: %v\n " , err )
169
171
return err
170
172
}
171
173
@@ -183,19 +185,12 @@ func (p *ProxyServer) loadConfig() error {
183
185
}
184
186
}
185
187
if ! found {
186
- log . Printf ("Deleting old host %v\n " , key )
188
+ klog . V ( 2 ). Infof ("Deleting old host %v\n " , key )
187
189
p .config .Delete (key )
188
190
}
189
191
return true
190
192
})
191
- /* Printf("New map:")
192
- p.config.Range(func(key, value interface{}) bool {
193
- log.Printf("%v\n", key)
194
- log.Printf("%v\n", value)
195
- return true
196
- })
197
- */
198
- log .Println ("Configuration reloaded successfully" )
193
+ klog .V (4 ).Infof ("Configuration reloaded successfully" )
199
194
return nil
200
195
}
201
196
@@ -204,7 +199,7 @@ func (p *ProxyServer) reloadConfigPeriodically() {
204
199
for {
205
200
time .Sleep (30 * time .Second )
206
201
if err := p .loadConfig (); err != nil {
207
- log . Println ("Failed to reload config:" , err )
202
+ klog . V ( 1 ). Infof ("Failed to reload config:" , err )
208
203
}
209
204
}
210
205
}
@@ -219,7 +214,7 @@ func (p *ProxyServer) updateQueueLengthPeriodically() {
219
214
220
215
// worker processes requests from the queue
221
216
func (p * ProxyServer ) worker (id int ) {
222
- log . Printf ("Worker %d started\n " , id )
217
+ klog . V ( 2 ). Infof ("Worker %d started\n " , id )
223
218
for req := range p .queue {
224
219
p .proxyRequest (req ) // Process each request from the queue
225
220
}
@@ -273,7 +268,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
273
268
274
269
if ! found || len (backends ) == 0 {
275
270
p .totalFailed .WithLabelValues (host ).Inc () // Increment failed request counter
276
- log . Printf ("Error host: '%v' not found in config file, droping request\n " , host )
271
+ klog . V ( 1 ). Infof ("Error host: '%v' not found in config file, droping request\n " , host )
277
272
return
278
273
}
279
274
@@ -283,6 +278,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
283
278
for i := 0 ; i <= backend .Retries ; i ++ {
284
279
req , err := http .NewRequest (r .Req .Method , backend .Backend + r .Req .URL .Path , bytes .NewReader (r .Body ));
285
280
if err != nil {
281
+ klog .V (4 ).Infof ("Message failed for host %s with resp code %v error: %v\n " , host , err )
286
282
lastErr = err
287
283
continue
288
284
}
@@ -291,14 +287,16 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
291
287
resp , err := client .Do (req )
292
288
// Read the response body to ensure the connection can be reused
293
289
if _ , err := io .Copy (io .Discard , resp .Body ); err != nil {
294
- log . Printf ("Failed to read %v response body: %v\n " , backend .Backend + r .Req .URL .Path , err )
290
+ klog . V ( 1 ). Infof ("Failed to read %v response body: %v\n " , backend .Backend + r .Req .URL .Path , err )
295
291
}
296
292
297
293
resp .Body .Close ()
298
294
if err == nil && resp .StatusCode < 400 {
299
295
// Successfully forwarded request
300
296
p .totalForwarded .WithLabelValues (host , backend .Backend ).Inc ()
301
297
return
298
+ } else {
299
+ klog .V (4 ).Infof ("Message failed for host %s with resp code %v error: %v\n " , host ,resp .StatusCode , err )
302
300
}
303
301
lastErr = err
304
302
p .totalRetries .WithLabelValues (host , backend .Backend ).Inc ()
@@ -308,7 +306,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
308
306
309
307
// If we get here, all backends failed
310
308
p .totalFailed .WithLabelValues (host ).Inc ()
311
- log . Printf ("All backends failed for host %s: %v\n " , host , lastErr )
309
+ klog . V ( 1 ). Infof ("All backends failed for host %s: %v\n " , host , lastErr )
312
310
}
313
311
314
312
// handleIncomingRequest queues incoming requests
@@ -340,11 +338,14 @@ func getEnv(key string, defaultValue string) string {
340
338
}
341
339
342
340
func main () {
341
+ klog .InitFlags (nil )
342
+
343
343
var conf Opts
344
344
_ , err := flags .Parse (& conf )
345
345
if err != nil {
346
- log . Fatal ( err )
346
+ klog . Fatalf ( "Error parsing flags: %v" , err )
347
347
}
348
+ flag .Set ("v" , fmt .Sprintf ("%d" , conf .LogLevel ))
348
349
349
350
// Create a new proxy server with the path to the YAML config
350
351
proxy := NewProxyServer (conf .ConfigPath , conf .QueueSize , conf .WorkerCount ) // queue size = 100, worker count = 5
@@ -356,11 +357,17 @@ func main() {
356
357
go func () {
357
358
metricsMux := http .NewServeMux ()
358
359
metricsMux .Handle ("/metrics" , promhttp .Handler ())
359
- log .Printf ("Prometheus metrics server listening on %s\n " , conf .MetricsAddress )
360
- log .Fatal (http .ListenAndServe (conf .MetricsAddress , metricsMux ))
360
+ klog .V (1 ).Infof ("Prometheus metrics server listening on %s\n " , conf .MetricsAddress )
361
+ err := http .ListenAndServe (conf .MetricsAddress , metricsMux )
362
+ if err != nil {
363
+ klog .Fatalf ("Failed to start server: %v" , err ) // %v formats the error as a string
364
+ }
361
365
}()
362
366
363
367
// Start the proxy server
364
- log .Printf ("Proxy server is listening on %s\n " , conf .ListenAddress )
365
- log .Fatal (http .ListenAndServe (conf .ListenAddress , nil ))
368
+ klog .V (1 ).Infof ("Proxy server is listening on %s\n " , conf .ListenAddress )
369
+ err = http .ListenAndServe (conf .ListenAddress , nil )
370
+ if err != nil {
371
+ klog .Fatalf ("Failed to start server: %v" , err ) // %v formats the error as a string
372
+ }
366
373
}
0 commit comments