From a6616d8ff4ff2d45a5d8ecb53644cb1a2b8a4a2b Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Thu, 25 Jul 2024 10:06:52 +0800 Subject: [PATCH 01/10] stack exchange broadcast message filter by room --- stackexchange/redis/stackexchange_redis.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index c19c6d3..63caf13 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -206,7 +206,13 @@ func (exc *StackExchange) OnConnect(c *neffos.Conn) error { msg := c.DeserializeMessage(neffos.TextMessage, redisMsg.Message) msg.FromStackExchange = true - c.Write(msg) + if len(msg.Namespace) > 0 && len(msg.Room) > 0 { + if room := c.Namespace(msg.Namespace).Room(msg.Room); room != nil { + c.Write(msg) + } + } else { + c.Write(msg) + } } }() From 58b1de5deaf28630ac6ccaf308f65b7012378bfe Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 22 Jan 2025 13:59:42 +0800 Subject: [PATCH 02/10] extends the redis stack exchange for multiplex --- go.mod | 9 +- go.sum | 23 +- stackexchange.go | 11 + stackexchange/nats/stackexchange_nats.go | 19 ++ stackexchange/redis/stackexchange_redis.go | 243 ++++++++++++++++++++- 5 files changed, 280 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index a0e562a..eb819c7 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,13 @@ module github.com/kataras/neffos go 1.21 require ( + github.com/bytedance/gopkg v0.1.2-0.20241212062526-165e60aa2d41 github.com/gobwas/ws v1.3.2 github.com/gorilla/websocket v1.5.1 github.com/iris-contrib/go.uuid v2.0.0+incompatible github.com/mediocregopher/radix/v3 v3.8.1 github.com/nats-io/nats.go v1.31.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.8.0 ) require ( @@ -17,9 +18,9 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.16.0 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 9948677..c5a750f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/bytedance/gopkg v0.1.2-0.20241212062526-165e60aa2d41 h1:GeEIwFve9/hJEBkMfvmzFT8LnklciMooz+eVP6sZDfI= +github.com/bytedance/gopkg v0.1.2-0.20241212062526-165e60aa2d41/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= @@ -27,18 +29,21 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/stackexchange.go b/stackexchange.go index c355e91..3655a6a 100644 --- a/stackexchange.go +++ b/stackexchange.go @@ -39,6 +39,9 @@ type StackExchange interface { // NotifyAsk should notify and unblock a subscribed connection for this // specific message, "token" is the neffos wait signal for this message. NotifyAsk(msg Message, token string) error + + // OnStackExchangeInit is called automatically when the server is initialized. + OnStackExchangeInit(namespaces Namespaces) } // StackExchangeInitializer is an optional interface for a `StackExchange`. @@ -53,6 +56,8 @@ type StackExchangeInitializer interface { func stackExchangeInit(s StackExchange, namespaces Namespaces) error { if s != nil { + s.OnStackExchangeInit(namespaces) + if sinit, ok := s.(StackExchangeInitializer); ok { return sinit.Init(namespaces) } @@ -127,3 +132,9 @@ func (s *stackExchangeWrapper) Unsubscribe(c *Conn, namespace string) { s.parent.Unsubscribe(c, namespace) s.current.Unsubscribe(c, namespace) } + +// OnStackExchangeInit is called automatically when the server is initialized. +func (s *stackExchangeWrapper) OnStackExchangeInit(namespaces Namespaces) { + s.parent.OnStackExchangeInit(namespaces) + s.current.OnStackExchangeInit(namespaces) +} diff --git a/stackexchange/nats/stackexchange_nats.go b/stackexchange/nats/stackexchange_nats.go index 659800d..3230d21 100644 --- a/stackexchange/nats/stackexchange_nats.go +++ b/stackexchange/nats/stackexchange_nats.go @@ -28,6 +28,9 @@ type StackExchange struct { subscribe chan subscribeAction unsubscribe chan unsubscribeAction delSubscriber chan closeAction + + allowNativeMessages bool + shouldHandleOnlyNativeMessages bool } var _ neffos.StackExchange = (*StackExchange)(nil) @@ -377,3 +380,19 @@ func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) { func (exc *StackExchange) OnDisconnect(c *neffos.Conn) { exc.delSubscriber <- closeAction{conn: c} } + +// OnStackExchangeInit is called automatically when the server is initialized. +func (exc *StackExchange) OnStackExchangeInit(namespaces neffos.Namespaces) { + if emptyNamespace := namespaces[""]; emptyNamespace != nil && emptyNamespace[neffos.OnNativeMessage] != nil { + exc.allowNativeMessages = true + + // if allow native messages and only this namespace empty namespaces is registered (via Events{} for example) + // and the only one event is the `OnNativeMessage` + // then no need to call Connect(...) because: + // client-side can use raw websocket without the neffos.js library + // so no access to connect to a namespace. + if len(namespaces) == 1 && len(emptyNamespace) == 1 { + exc.shouldHandleOnlyNativeMessages = true + } + } +} diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index 63caf13..18e56f5 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -3,8 +3,12 @@ package redis import ( "context" "math/rand" + "sync" "time" + "github.com/bytedance/gopkg/collection/skipmap" + "github.com/bytedance/gopkg/collection/zset" + uuid "github.com/iris-contrib/go.uuid" "github.com/kataras/neffos" "github.com/mediocregopher/radix/v3" @@ -30,6 +34,9 @@ type Config struct { // MaxActive defines the size connection pool. // Defaults to 10. MaxActive int + + // the limit number of websocket connections per redis connection (default: 1) + WsNumPerRedisConn int } // StackExchange is a `neffos.StackExchange` for redis. @@ -45,6 +52,14 @@ type StackExchange struct { subscribe chan subscribeAction unsubscribe chan unsubscribeAction delSubscriber chan closeAction + + multiplexAddSubscriber chan multiplexSubscriber + wsNumPerRedisConn int // the limit number of websocket connections per redis connection + subscriberZset *zset.Float64Set // value: subscriberID, score: number of websocket connections + subscriberSkipMap *skipmap.StringMap // key: subscriberID, value: *subscriber + + allowNativeMessages bool + shouldHandleOnlyNativeMessages bool } type ( @@ -52,6 +67,13 @@ type ( conn *neffos.Conn pubSub radix.PubSubConn msgCh chan<- radix.PubSubMessage + + isMultiplex bool // indicates if this subscriber is multiplexed + subscribedNs map[string]int // key: the namespaces that this subscriber subscribed to. value: number of websocket connections + closing bool // indicates if the subscriber is closing + id string // the subscriberID + conns *sync.Map // the websocket connections that subscribed. key: connID, value: *neffos.Conn + mu *sync.RWMutex // the mutex that used to sync the subscriber status } subscribeAction struct { @@ -67,6 +89,11 @@ type ( closeAction struct { conn *neffos.Conn } + + multiplexSubscriber struct { + conn *neffos.Conn + subscriber *subscriber + } ) var _ neffos.StackExchange = (*StackExchange)(nil) @@ -90,6 +117,10 @@ func NewStackExchange(cfg Config, channel string) (*StackExchange, error) { cfg.MaxActive = 10 } + if cfg.WsNumPerRedisConn < 1 { + cfg.WsNumPerRedisConn = 1 + } + var dialOptions []radix.DialOpt if cfg.Password != "" { @@ -141,6 +172,11 @@ func NewStackExchange(cfg Config, channel string) (*StackExchange, error) { delSubscriber: make(chan closeAction), subscribe: make(chan subscribeAction), unsubscribe: make(chan unsubscribeAction), + + multiplexAddSubscriber: make(chan multiplexSubscriber), + wsNumPerRedisConn: cfg.WsNumPerRedisConn, + subscriberZset: zset.NewFloat64(), + subscriberSkipMap: skipmap.NewString(), } go exc.run() @@ -154,26 +190,66 @@ func (exc *StackExchange) run() { case s := <-exc.addSubscriber: exc.subscribers[s.conn] = s // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID()) + case s := <-exc.multiplexAddSubscriber: + exc.subscribers[s.conn] = s.subscriber case m := <-exc.subscribe: if sub, ok := exc.subscribers[m.conn]; ok { - channel := exc.getChannel(m.namespace, "", "") - sub.pubSub.PSubscribe(sub.msgCh, channel) - // neffos.Debugf("[%s] subscribed to [%s] for namespace [%s]", m.conn.ID(), channel, m.namespace) - // } else { - // neffos.Debugf("[%s] tried to subscribe to [%s] namespace before 'OnConnect.addSubscriber'!", m.conn.ID(), m.namespace) + if !sub.isMultiplex { + channel := exc.getChannel(m.namespace, "", "") + sub.pubSub.PSubscribe(sub.msgCh, channel) + // neffos.Debugf("[%s] subscribed to [%s] for namespace [%s]", m.conn.ID(), channel, m.namespace) + // } else { + // neffos.Debugf("[%s] tried to subscribe to [%s] namespace before 'OnConnect.addSubscriber'!", m.conn.ID(), m.namespace) + } else { + if count, has := sub.subscribedNs[m.namespace]; !has { + channel := exc.getChannel(m.namespace, "", "") + sub.pubSub.PSubscribe(sub.msgCh, channel) + sub.subscribedNs[m.namespace] = 1 + } else { + sub.subscribedNs[m.namespace] = count + 1 + } + } } case m := <-exc.unsubscribe: if sub, ok := exc.subscribers[m.conn]; ok { - channel := exc.getChannel(m.namespace, "", "") - // neffos.Debugf("[%s] unsubscribed from [%s]", channel) - sub.pubSub.PUnsubscribe(sub.msgCh, channel) + if !sub.isMultiplex { + channel := exc.getChannel(m.namespace, "", "") + // neffos.Debugf("[%s] unsubscribed from [%s]", channel) + sub.pubSub.PUnsubscribe(sub.msgCh, channel) + } else { + count := sub.subscribedNs[m.namespace] + if count < 2 { + delete(sub.subscribedNs, m.namespace) + channel := exc.getChannel(m.namespace, "", "") + sub.pubSub.PUnsubscribe(sub.msgCh, channel) + } else { + sub.subscribedNs[m.namespace] = count - 1 + } + } } case m := <-exc.delSubscriber: if sub, ok := exc.subscribers[m.conn]; ok { - // neffos.Debugf("[%s] disconnected", m.conn.ID()) - sub.pubSub.Close() - close(sub.msgCh) - delete(exc.subscribers, m.conn) + if !sub.isMultiplex { + // neffos.Debugf("[%s] disconnected", m.conn.ID()) + sub.pubSub.Close() + close(sub.msgCh) + delete(exc.subscribers, m.conn) + } else { + sub.mu.Lock() + sub.conns.Delete(m.conn.ID()) + if left, _ := exc.subscriberZset.IncrBy(-1, sub.id); left < 1 { + sub.pubSub.Close() + close(sub.msgCh) + sub.closing = true + exc.subscriberZset.Remove(sub.id) + exc.subscriberSkipMap.Delete(sub.id) + } else { + channel := exc.getChannel("", "", m.conn.ID()) + sub.pubSub.PUnsubscribe(sub.msgCh, channel) + } + delete(exc.subscribers, m.conn) + sub.mu.Unlock() + } } } } @@ -199,6 +275,18 @@ func (exc *StackExchange) getChannel(namespace, room, connID string) string { // It's called automatically after the neffos server's OnConnect (if any) // on incoming client connections. func (exc *StackExchange) OnConnect(c *neffos.Conn) error { + if exc.wsNumPerRedisConn == 1 { + return exc.onConnect(c) + } else { + return exc.multiplexOnConnect(c) + } +} + +// OnConnect prepares the connection redis subscriber +// and subscribes to itself for direct neffos messages. +// It's called automatically after the neffos server's OnConnect (if any) +// on incoming client connections. +func (exc *StackExchange) onConnect(c *neffos.Conn) error { redisMsgCh := make(chan radix.PubSubMessage) go func() { for redisMsg := range redisMsgCh { @@ -230,6 +318,121 @@ func (exc *StackExchange) OnConnect(c *neffos.Conn) error { return nil } +func (exc *StackExchange) multiplexOnConnect(c *neffos.Conn) error { + subs := exc.subscriberZset.Range(0, 0) + if len(subs) > 0 && subs[0].Score > 0 && subs[0].Score < float64(exc.wsNumPerRedisConn) { + // reuse existing subscriber + if existing, has := exc.subscriberSkipMap.Load(subs[0].Value); has { + if s, ok := existing.(*subscriber); ok && !s.closing { + s.mu.RLock() + if !s.closing { + // still avalible + var err error + selfChannel := exc.getChannel("", "", c.ID()) + if e := s.pubSub.PSubscribe(s.msgCh, selfChannel); e != nil { + err = e + } else { + s.conns.Store(c.ID(), c) + exc.subscriberZset.IncrBy(1, s.id) + + multiplexSub := multiplexSubscriber{ + conn: c, + subscriber: s, + } + exc.multiplexAddSubscriber <- multiplexSub + } + + s.mu.RUnlock() + return err + } + s.mu.RUnlock() + } + } + } + + redisMsgCh := make(chan radix.PubSubMessage) + uid, _ := uuid.NewV4() + s := &subscriber{ + // conn: c, + // pubSub: pubSub, + msgCh: redisMsgCh, + conns: &sync.Map{}, + id: uid.String(), + mu: &sync.RWMutex{}, + isMultiplex: true, + subscribedNs: make(map[string]int), + } + + go func() { + for redisMsg := range redisMsgCh { + // neffos.Debugf("[%s] send to client: [%s]", c.ID(), string(redisMsg.Message)) + msg := neffos.DeserializeMessage(neffos.TextMessage, redisMsg.Message, exc.allowNativeMessages, exc.shouldHandleOnlyNativeMessages) + msg.FromStackExchange = true + + if len(msg.To) > 0 { + // specify client + if spConn, has := s.conns.Load(msg.To); has { + if conn, ok := spConn.(*neffos.Conn); ok { + conn.Write(msg) + } + } + } else { + // broadcast + if len(msg.Namespace) > 0 { + if len(msg.Room) > 0 { + // broadcast to specify room + s.conns.Range(func(key, value interface{}) bool { + if conn, ok := value.(*neffos.Conn); ok { + if room := conn.Namespace(msg.Namespace).Room(msg.Room); room != nil { + conn.Write(msg) + } + } + return true + }) + } else { + // broadcast to specify namespace + s.conns.Range(func(key, value interface{}) bool { + if conn, ok := value.(*neffos.Conn); ok { + if ns := conn.Namespace(msg.Namespace); ns != nil { + conn.Write(msg) + } + } + return true + }) + } + } else { + // broadcast to all + s.conns.Range(func(key, value interface{}) bool { + if conn, ok := value.(*neffos.Conn); ok { + conn.Write(msg) + } + return true + }) + } + } + } + }() + + pubSub := radix.PersistentPubSub("", "", exc.connFunc) + s.pubSub = pubSub + selfChannel := exc.getChannel("", "", c.ID()) + if err := pubSub.PSubscribe(redisMsgCh, selfChannel); err != nil { + return err + } else { + s.conns.Store(c.ID(), c) + exc.subscriberSkipMap.Store(s.id, s) + exc.subscriberZset.IncrBy(1, s.id) + } + + multiplexSub := multiplexSubscriber{ + conn: c, + subscriber: s, + } + exc.multiplexAddSubscriber <- multiplexSub + + return nil +} + // Publish publishes messages through redis. // It's called automatically on neffos broadcasting. func (exc *StackExchange) Publish(msgs []neffos.Message) bool { @@ -314,3 +517,19 @@ func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) { func (exc *StackExchange) OnDisconnect(c *neffos.Conn) { exc.delSubscriber <- closeAction{conn: c} } + +// OnStackExchangeInit is called automatically when the server is initialized. +func (exc *StackExchange) OnStackExchangeInit(namespaces neffos.Namespaces) { + if emptyNamespace := namespaces[""]; emptyNamespace != nil && emptyNamespace[neffos.OnNativeMessage] != nil { + exc.allowNativeMessages = true + + // if allow native messages and only this namespace empty namespaces is registered (via Events{} for example) + // and the only one event is the `OnNativeMessage` + // then no need to call Connect(...) because: + // client-side can use raw websocket without the neffos.js library + // so no access to connect to a namespace. + if len(namespaces) == 1 && len(emptyNamespace) == 1 { + exc.shouldHandleOnlyNativeMessages = true + } + } +} From 59cc0576899117923d23f9b3d63dd8ba4583cadc Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Thu, 20 Feb 2025 15:48:13 +0800 Subject: [PATCH 03/10] leave namespace on conn close for multiplex redis stack --- conn.go | 4 +++- server.go | 16 ++++++++++++---- stackexchange.go | 8 ++++---- stackexchange/nats/stackexchange_nats.go | 7 ++++--- stackexchange/redis/stackexchange_redis.go | 20 +++++++++++++++++--- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/conn.go b/conn.go index eb6ff01..715f5e6 100644 --- a/conn.go +++ b/conn.go @@ -1006,6 +1006,7 @@ func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage // After this method call the `Conn` is not usable anymore, a new `Dial` call is required. func (c *Conn) Close() { if atomic.CompareAndSwapUint32(c.closed, 0, 1) { + var disconnectNamspaces []string if !c.shouldHandleOnlyNativeMessages { disconnectMsg := Message{Event: OnNamespaceDisconnect, IsForced: true, IsLocal: true} c.connectedNamespacesMutex.Lock() @@ -1016,6 +1017,7 @@ func (c *Conn) Close() { disconnectMsg.Namespace = ns.namespace ns.events.fireEvent(ns, disconnectMsg) delete(c.connectedNamespaces, namespace) + disconnectNamspaces = append(disconnectNamspaces, namespace) } c.connectedNamespacesMutex.Unlock() @@ -1030,7 +1032,7 @@ func (c *Conn) Close() { if !c.IsClient() { go func() { - c.server.disconnect <- c + c.server.disconnect <- disconnectAction{conn: c, namespaces: disconnectNamspaces} }() } diff --git a/server.go b/server.go index 5f0feea..d94165b 100644 --- a/server.go +++ b/server.go @@ -78,7 +78,7 @@ type Server struct { connections map[*Conn]struct{} connect chan *Conn - disconnect chan *Conn + disconnect chan disconnectAction actions chan action broadcastMessages chan []Message @@ -103,6 +103,13 @@ type Server struct { OnDisconnect func(c *Conn) } +type ( + disconnectAction struct { + conn *Conn + namespaces []string + } +) + // New constructs and returns a new neffos server. // Listens to incoming connections automatically, no further action is required from the caller. // The second parameter is the "connHandler", it can be @@ -121,7 +128,7 @@ func New(upgrader Upgrader, connHandler ConnHandler) *Server { writeTimeout: writeTimeout, connections: make(map[*Conn]struct{}), connect: make(chan *Conn, 1), - disconnect: make(chan *Conn), + disconnect: make(chan disconnectAction), actions: make(chan action), broadcastMessages: make(chan []Message), broadcaster: newBroadcaster(), @@ -172,7 +179,8 @@ func (s *Server) start() { case c := <-s.connect: s.connections[c] = struct{}{} atomic.AddUint64(&s.count, 1) - case c := <-s.disconnect: + case dc := <-s.disconnect: + c := dc.conn if _, ok := s.connections[c]; ok { // close(c.out) delete(s.connections, c) @@ -187,7 +195,7 @@ func (s *Server) start() { } if s.usesStackExchange() { - s.StackExchange.OnDisconnect(c) + s.StackExchange.OnDisconnect(c, dc.namespaces) } } case msgs := <-s.broadcastMessages: diff --git a/stackexchange.go b/stackexchange.go index 3655a6a..f4b7726 100644 --- a/stackexchange.go +++ b/stackexchange.go @@ -21,7 +21,7 @@ type StackExchange interface { // created on the `OnConnect` method. // It's called automatically when a connection goes offline, // manually by server or client or by network failure. - OnDisconnect(c *Conn) + OnDisconnect(c *Conn, namespaces []string) // Publish should publish messages through a stackexchange. // It's called automatically on neffos broadcasting. @@ -91,9 +91,9 @@ func (s *stackExchangeWrapper) OnConnect(c *Conn) error { return s.current.OnConnect(c) } -func (s *stackExchangeWrapper) OnDisconnect(c *Conn) { - s.parent.OnDisconnect(c) - s.current.OnDisconnect(c) +func (s *stackExchangeWrapper) OnDisconnect(c *Conn, namespaces []string) { + s.parent.OnDisconnect(c, namespaces) + s.current.OnDisconnect(c, namespaces) } func (s *stackExchangeWrapper) Publish(msgs []Message) bool { diff --git a/stackexchange/nats/stackexchange_nats.go b/stackexchange/nats/stackexchange_nats.go index 3230d21..4a0ea43 100644 --- a/stackexchange/nats/stackexchange_nats.go +++ b/stackexchange/nats/stackexchange_nats.go @@ -58,7 +58,8 @@ type ( } closeAction struct { - conn *neffos.Conn + conn *neffos.Conn + namespaces []string } ) @@ -377,8 +378,8 @@ func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) { // closes the internal read messages channel. // It's called automatically when a connection goes offline, // manually by server or client or by network failure. -func (exc *StackExchange) OnDisconnect(c *neffos.Conn) { - exc.delSubscriber <- closeAction{conn: c} +func (exc *StackExchange) OnDisconnect(c *neffos.Conn, namespaces []string) { + exc.delSubscriber <- closeAction{conn: c, namespaces: namespaces} } // OnStackExchangeInit is called automatically when the server is initialized. diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index 18e56f5..70f2f43 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -87,7 +87,8 @@ type ( } closeAction struct { - conn *neffos.Conn + conn *neffos.Conn + namespaces []string } multiplexSubscriber struct { @@ -237,6 +238,19 @@ func (exc *StackExchange) run() { } else { sub.mu.Lock() sub.conns.Delete(m.conn.ID()) + + for _, ns := range m.namespaces { + if count, has := sub.subscribedNs[ns]; has { + if count < 2 { + delete(sub.subscribedNs, ns) + channel := exc.getChannel(ns, "", "") + sub.pubSub.PUnsubscribe(sub.msgCh, channel) + } else { + sub.subscribedNs[ns] = count - 1 + } + } + } + if left, _ := exc.subscriberZset.IncrBy(-1, sub.id); left < 1 { sub.pubSub.Close() close(sub.msgCh) @@ -514,8 +528,8 @@ func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) { // closes the internal read messages channel. // It's called automatically when a connection goes offline, // manually by server or client or by network failure. -func (exc *StackExchange) OnDisconnect(c *neffos.Conn) { - exc.delSubscriber <- closeAction{conn: c} +func (exc *StackExchange) OnDisconnect(c *neffos.Conn, namespaces []string) { + exc.delSubscriber <- closeAction{conn: c, namespaces: namespaces} } // OnStackExchangeInit is called automatically when the server is initialized. From ebf0b0fa337eacd89ed360f33b8c6fac6833d104 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Tue, 25 Feb 2025 13:48:24 +0800 Subject: [PATCH 04/10] serialize/deserialize the neffos.Message's "To" field when sending message to the specific websocket connection exchanged by multiplex redis stack --- conn.go | 2 +- message.go | 99 +++++++++++++++------- message_test.go | 4 +- stackexchange/nats/stackexchange_nats.go | 4 +- stackexchange/redis/stackexchange_redis.go | 4 +- 5 files changed, 76 insertions(+), 37 deletions(-) diff --git a/conn.go b/conn.go index 715f5e6..69e1366 100644 --- a/conn.go +++ b/conn.go @@ -909,7 +909,7 @@ func (c *Conn) Write(msg Message) bool { } msg.FromExplicit = "" - return c.write(serializeMessage(msg), msg.SetBinary) + return c.write(serializeMessage(msg, false), msg.SetBinary) } // used when `Ask` caller cares only for successful call and not the message, for performance reasons we just use raw bytes. diff --git a/message.go b/message.go index 706c742..d9276d8 100644 --- a/message.go +++ b/message.go @@ -117,8 +117,8 @@ func (m *Message) isRoomLeft() bool { } // Serialize returns this message's transport format. -func (m Message) Serialize() []byte { - return serializeMessage(m) +func (m Message) Serialize(publishToMultiplexStackExchange bool) []byte { + return serializeMessage(m, publishToMultiplexStackExchange) } type ( @@ -274,7 +274,7 @@ func unescape(s string) string { return strings.Replace(s, messageFieldSeparatorReplacement, messageSeparatorString, -1) } -func serializeMessage(msg Message) (out []byte) { +func serializeMessage(msg Message, publishToMultiplexStackExchange bool) (out []byte) { if msg.IsNative && msg.wait == "" { out = msg.Body } else { @@ -286,13 +286,19 @@ func serializeMessage(msg Message) (out []byte) { msg.wait = msg.FromExplicit } - out = serializeOutput(msg.wait, escape(msg.Namespace), escape(msg.Room), escape(msg.Event), msg.Body, msg.Err, msg.isNoOp) + + var to string + if publishToMultiplexStackExchange && msg.To != "" { + to = msg.To + } + + out = serializeOutput(msg.wait, escape(msg.Namespace), escape(msg.Room), escape(to), escape(msg.Event), msg.Body, msg.Err, msg.isNoOp) } return out } -func serializeOutput(wait, namespace, room, event string, +func serializeOutput(wait, namespace, room, to, event string, body []byte, err error, isNoOp bool, @@ -321,15 +327,29 @@ func serializeOutput(wait, namespace, room, event string, waitByte = []byte(wait) } - msg := bytes.Join([][]byte{ // this number of fields should match the deserializer's, see `validMessageSepCount`. - waitByte, - []byte(namespace), - []byte(room), - []byte(event), - isErrorByte, - isNoOpByte, - body, - }, messageSeparator) + var msg []byte + if to == "" { + msg = bytes.Join([][]byte{ // this number of fields should match the deserializer's, see `validMessageSepCount`. + waitByte, + []byte(namespace), + []byte(room), + []byte(event), + isErrorByte, + isNoOpByte, + body, + }, messageSeparator) + } else { + msg = bytes.Join([][]byte{ // this number of fields should match the deserializer's, see `validMessageSepCount`. + waitByte, + []byte(namespace), + []byte(room), + []byte(to), + []byte(event), + isErrorByte, + isNoOpByte, + body, + }, messageSeparator) + } return msg } @@ -338,7 +358,7 @@ func serializeOutput(wait, namespace, room, event string, // and returns a neffos Message. // When allowNativeMessages only Body is filled and check about message format is skipped. func DeserializeMessage(msgTyp MessageType, b []byte, allowNativeMessages, shouldHandleOnlyNativeMessages bool) Message { - wait, namespace, room, event, body, err, isNoOp, isInvalid := deserializeInput(b, allowNativeMessages, shouldHandleOnlyNativeMessages) + wait, namespace, room, to, event, body, err, isNoOp, isInvalid := deserializeInput(b, allowNativeMessages, shouldHandleOnlyNativeMessages) fromExplicit := "" if isServerConnID(wait) { @@ -366,7 +386,7 @@ func DeserializeMessage(msgTyp MessageType, b []byte, allowNativeMessages, shoul from: "", FromExplicit: fromExplicit, FromStackExchange: fromStackExchange, - To: "", + To: unescape(to), IsForced: false, IsLocal: false, IsNative: allowNativeMessages && event == OnNativeMessage, @@ -419,6 +439,7 @@ func deserializeInput(b []byte, allowNativeMessages, shouldHandleOnlyNativeMessa wait, namespace, room, + to, event string, body []byte, err error, @@ -438,8 +459,8 @@ func deserializeInput(b []byte, allowNativeMessages, shouldHandleOnlyNativeMessa } // Note: Go's SplitN returns the remainder in[6] but JavasSript's string.split behaves differently. - dts := bytes.SplitN(b, messageSeparator, validMessageSepCount) - if len(dts) != validMessageSepCount { + dts := bytes.SplitN(b, messageSeparator, validMessageSepCount+1) + if len(dts) != validMessageSepCount && len(dts) != validMessageSepCount+1 { if !allowNativeMessages { isInvalid = true return @@ -450,18 +471,36 @@ func deserializeInput(b []byte, allowNativeMessages, shouldHandleOnlyNativeMessa return } - wait = string(dts[0]) - namespace = string(dts[1]) - room = string(dts[2]) - event = string(dts[3]) - isError := bytes.Equal(dts[4], trueByte) - isNoOp = bytes.Equal(dts[5], trueByte) - if b := dts[6]; len(b) > 0 { - if isError { - errorText := string(b) - err = resolveError(errorText) - } else { - body = b // keep it like that. + if len(dts) == validMessageSepCount { + wait = string(dts[0]) + namespace = string(dts[1]) + room = string(dts[2]) + event = string(dts[3]) + isError := bytes.Equal(dts[4], trueByte) + isNoOp = bytes.Equal(dts[5], trueByte) + if b := dts[6]; len(b) > 0 { + if isError { + errorText := string(b) + err = resolveError(errorText) + } else { + body = b // keep it like that. + } + } + } else { + wait = string(dts[0]) + namespace = string(dts[1]) + room = string(dts[2]) + to = string(dts[3]) + event = string(dts[4]) + isError := bytes.Equal(dts[5], trueByte) + isNoOp = bytes.Equal(dts[6], trueByte) + if b := dts[7]; len(b) > 0 { + if isError { + errorText := string(b) + err = resolveError(errorText) + } else { + body = b // keep it like that. + } } } diff --git a/message_test.go b/message_test.go index 55d2856..d91a3ef 100644 --- a/message_test.go +++ b/message_test.go @@ -83,7 +83,7 @@ func TestMessageSerialization(t *testing.T) { } for i, tt := range tests { - got := serializeMessage(tt.msg) + got := serializeMessage(tt.msg, false) if !bytes.Equal(got, tt.serialized) { t.Fatalf("[%d] serialize: expected %s but got %s", i, tt.serialized, got) } @@ -129,7 +129,7 @@ func TestMessageSerialization(t *testing.T) { expectedSerialized := []byte(fmt.Sprintf(";contains%ssemi;%sthis%sfor sure%s;thatdoesnot;0;0;", messageFieldSeparatorReplacement, messageFieldSeparatorReplacement, messageFieldSeparatorReplacement, messageFieldSeparatorReplacement)) - gotSerialized := serializeMessage(msg) + gotSerialized := serializeMessage(msg, false) if !bytes.Equal(expectedSerialized, gotSerialized) { t.Fatalf("expected escaped serialized to be: %s but got: %s", string(expectedSerialized), string(gotSerialized)) diff --git a/stackexchange/nats/stackexchange_nats.go b/stackexchange/nats/stackexchange_nats.go index 4a0ea43..96fdc71 100644 --- a/stackexchange/nats/stackexchange_nats.go +++ b/stackexchange/nats/stackexchange_nats.go @@ -301,7 +301,7 @@ func (exc *StackExchange) Publish(msgs []neffos.Message) bool { func (exc *StackExchange) publish(msg neffos.Message) bool { subject := exc.getSubject(msg.Namespace, msg.Room, msg.To) - b := msg.Serialize() + b := msg.Serialize(false) err := exc.publisher.Publish(subject, b) // Let's not add logging options, let @@ -346,7 +346,7 @@ func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token str // NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result. func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error { msg.ClearWait() - err := exc.publisher.Publish(token, msg.Serialize()) + err := exc.publisher.Publish(token, msg.Serialize(false)) if err != nil { return err } diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index 70f2f43..58475b9 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -464,7 +464,7 @@ func (exc *StackExchange) publish(msg neffos.Message) bool { channel := exc.getChannel(msg.Namespace, msg.Room, msg.To) // neffos.Debugf("[%s] publish to channel [%s] the data [%s]\n", msg.FromExplicit, channel, string(msg.Serialize())) - err := exc.publishCommand(channel, msg.Serialize()) + err := exc.publishCommand(channel, msg.Serialize(exc.wsNumPerRedisConn > 1)) return err == nil } @@ -501,7 +501,7 @@ func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token str // NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result. func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error { msg.ClearWait() - return exc.publishCommand(token, msg.Serialize()) + return exc.publishCommand(token, msg.Serialize(exc.wsNumPerRedisConn > 1)) } // Subscribe subscribes to a specific namespace, From 0ceecebbf8e0ee584736dfa68e55cb0aad072459 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 12 Mar 2025 10:56:12 +0800 Subject: [PATCH 05/10] add logger to server --- conn.go | 17 ++++++++++++++++- logger.go | 15 +++++++++++++++ server.go | 8 ++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 logger.go diff --git a/conn.go b/conn.go index 69e1366..9c3d1a8 100644 --- a/conn.go +++ b/conn.go @@ -3,6 +3,7 @@ package neffos import ( "context" "errors" + "fmt" "net" "net/http" "sync" @@ -325,6 +326,7 @@ func (c *Conn) startReader() { b, msgTyp, err := c.socket.ReadData(c.readTimeout) if err != nil { c.readiness.unwait(err) + c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) return } @@ -346,6 +348,7 @@ func (c *Conn) startReader() { } func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool { + c.server.Logger.Debug(fmt.Sprintf("handle ACK, id:%s, msgTyp:%d, b:%s", c.ID(), msgTyp, string(b))) switch typ := b[0]; typ { case ackBinary: // from client startup to server. @@ -353,6 +356,7 @@ func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool { if err != nil { // it's not Ok, send error which client's Dial should return. c.write(append(ackNotOKBinaryB, []byte(err.Error())...), false) + c.server.Logger.Error(fmt.Errorf("ackBinary err. id:%s, err:%s", c.ID(), err.Error())) return false } atomic.StoreUint32(c.acknowledged, 1) @@ -488,7 +492,18 @@ func (c *Conn) DeserializeMessage(msgTyp MessageType, payload []byte) Message { // HandlePayload fires manually a local event based on the "payload". func (c *Conn) HandlePayload(msgTyp MessageType, payload []byte) error { - return c.handleMessage(c.DeserializeMessage(msgTyp, payload)) + msg := c.DeserializeMessage(msgTyp, payload) + if err := c.handleMessage(msg); err != nil { + if err == ErrInvalidPayload { + c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, payload:%s, err:%s", c.ID(), msgTyp, string(payload), err.Error())) + } else { + c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, namespace:%s, room:%s, event:%s, body:%s, err:%s", c.ID(), msgTyp, msg.Namespace, msg.Room, msg.Event, string(msg.Body), err.Error())) + } + + return err + } + + return nil } const syncWaitDur = 15 * time.Millisecond diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..acced0b --- /dev/null +++ b/logger.go @@ -0,0 +1,15 @@ +package neffos + +type logger interface { + // 调试日志 + Debug(msg string) + + // 提示 + Info(msg string) + + // 警告 + Warn(msg string) + + // 错误日志 + Error(err error) +} diff --git a/server.go b/server.go index d94165b..f5e7ac9 100644 --- a/server.go +++ b/server.go @@ -91,6 +91,8 @@ type Server struct { closed uint32 + Logger logger + // OnUpgradeError can be optionally registered to catch upgrade errors. OnUpgradeError func(err error) // OnConnect can be optionally registered to be notified for any new neffos client connection, @@ -141,6 +143,11 @@ func New(upgrader Upgrader, connHandler ConnHandler) *Server { return s } +// WithLogger set a logger to server +func (s *Server) WithLogger(logger logger) { + s.Logger = logger +} + // UseStackExchange can be used to add one or more StackExchange // to the server. // Returns a non-nil error when "exc" @@ -359,6 +366,7 @@ func (s *Server) Upgrade( if s.usesStackExchange() { if err := s.StackExchange.OnConnect(c); err != nil { c.readiness.unwait(err) + s.Logger.Error(err) return nil, err } } From 0756a3bb9f63280c296efd53349e7da287aeb195 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 12 Mar 2025 11:27:16 +0800 Subject: [PATCH 06/10] log optimization --- conn.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/conn.go b/conn.go index 9c3d1a8..15adae0 100644 --- a/conn.go +++ b/conn.go @@ -9,6 +9,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/gorilla/websocket" ) type ( @@ -326,7 +328,11 @@ func (c *Conn) startReader() { b, msgTyp, err := c.socket.ReadData(c.readTimeout) if err != nil { c.readiness.unwait(err) - c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) + if websocket.IsCloseError(err, websocket.CloseNoStatusReceived) { + c.server.Logger.Debug(fmt.Sprintf("read data err. id:%s, err:%s", c.ID(), err.Error())) + } else { + c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) + } return } From ae65fa0b60d17178cbbee4566c66be81c60c5e19 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Thu, 3 Apr 2025 17:23:13 +0800 Subject: [PATCH 07/10] log optmization --- conn.go | 38 +++++++++++++++++++++++++++----------- server.go | 4 +++- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/conn.go b/conn.go index 15adae0..44e7104 100644 --- a/conn.go +++ b/conn.go @@ -327,12 +327,22 @@ func (c *Conn) startReader() { for { b, msgTyp, err := c.socket.ReadData(c.readTimeout) if err != nil { - c.readiness.unwait(err) - if websocket.IsCloseError(err, websocket.CloseNoStatusReceived) { - c.server.Logger.Debug(fmt.Sprintf("read data err. id:%s, err:%s", c.ID(), err.Error())) - } else { - c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) + if c.server.Logger != nil { + if !c.isAcknowledged() { + c.server.Logger.Error(fmt.Errorf("unacknowledged read data err. id:%s, err:%s", c.ID(), err.Error())) + } else if websocket.IsCloseError(err, websocket.CloseNoStatusReceived) { + c.server.Logger.Debug(fmt.Sprintf("read data err. id:%s, err:%s", c.ID(), err.Error())) + } else { + c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) + } } + + c.readiness.unwait(err) + // if websocket.IsCloseError(err, websocket.CloseNoStatusReceived) { + // c.server.Logger.Debug(fmt.Sprintf("read data err. id:%s, err:%s", c.ID(), err.Error())) + // } else { + // c.server.Logger.Error(fmt.Errorf("read data err. id:%s, err:%s", c.ID(), err.Error())) + // } return } @@ -354,7 +364,9 @@ func (c *Conn) startReader() { } func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool { - c.server.Logger.Debug(fmt.Sprintf("handle ACK, id:%s, msgTyp:%d, b:%s", c.ID(), msgTyp, string(b))) + if c.server.Logger != nil { + c.server.Logger.Debug(fmt.Sprintf("handle ACK, id:%s, msgTyp:%d, b:%s", c.ID(), msgTyp, string(b))) + } switch typ := b[0]; typ { case ackBinary: // from client startup to server. @@ -362,7 +374,9 @@ func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool { if err != nil { // it's not Ok, send error which client's Dial should return. c.write(append(ackNotOKBinaryB, []byte(err.Error())...), false) - c.server.Logger.Error(fmt.Errorf("ackBinary err. id:%s, err:%s", c.ID(), err.Error())) + if c.server.Logger != nil { + c.server.Logger.Error(fmt.Errorf("ackBinary err. id:%s, err:%s", c.ID(), err.Error())) + } return false } atomic.StoreUint32(c.acknowledged, 1) @@ -500,10 +514,12 @@ func (c *Conn) DeserializeMessage(msgTyp MessageType, payload []byte) Message { func (c *Conn) HandlePayload(msgTyp MessageType, payload []byte) error { msg := c.DeserializeMessage(msgTyp, payload) if err := c.handleMessage(msg); err != nil { - if err == ErrInvalidPayload { - c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, payload:%s, err:%s", c.ID(), msgTyp, string(payload), err.Error())) - } else { - c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, namespace:%s, room:%s, event:%s, body:%s, err:%s", c.ID(), msgTyp, msg.Namespace, msg.Room, msg.Event, string(msg.Body), err.Error())) + if c.server.Logger != nil { + if err == ErrInvalidPayload { + c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, payload:%s, err:%s", c.ID(), msgTyp, string(payload), err.Error())) + } else { + c.server.Logger.Error(fmt.Errorf("handle payload err. id:%s, msgType:%d, namespace:%s, room:%s, event:%s, body:%s, err:%s", c.ID(), msgTyp, msg.Namespace, msg.Room, msg.Event, string(msg.Body), err.Error())) + } } return err diff --git a/server.go b/server.go index f5e7ac9..18a574d 100644 --- a/server.go +++ b/server.go @@ -366,7 +366,9 @@ func (s *Server) Upgrade( if s.usesStackExchange() { if err := s.StackExchange.OnConnect(c); err != nil { c.readiness.unwait(err) - s.Logger.Error(err) + if s.Logger != nil { + s.Logger.Error(err) + } return nil, err } } From 14989af0046e42b4d9f3a7f03b0e92d1358d94c9 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 16 Apr 2025 11:13:02 +0800 Subject: [PATCH 08/10] deadlock buf fixed on select delSubscriber channel and push subscriber to multiplexAddSubscriber channel --- stackexchange/redis/stackexchange_redis.go | 38 ++++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index 58475b9..db9e72e 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -5,7 +5,9 @@ import ( "math/rand" "sync" "time" + "unsafe" + "github.com/bytedance/gopkg/collection/lscq" "github.com/bytedance/gopkg/collection/skipmap" "github.com/bytedance/gopkg/collection/zset" uuid "github.com/iris-contrib/go.uuid" @@ -53,10 +55,11 @@ type StackExchange struct { unsubscribe chan unsubscribeAction delSubscriber chan closeAction - multiplexAddSubscriber chan multiplexSubscriber - wsNumPerRedisConn int // the limit number of websocket connections per redis connection - subscriberZset *zset.Float64Set // value: subscriberID, score: number of websocket connections - subscriberSkipMap *skipmap.StringMap // key: subscriberID, value: *subscriber + multiplexAddSubscriber chan *multiplexSubscriber + wsNumPerRedisConn int // the limit number of websocket connections per redis connection + subscriberZset *zset.Float64Set // value: subscriberID, score: number of websocket connections + subscriberSkipMap *skipmap.StringMap // key: subscriberID, value: *subscriber + multiplexSubscriberQueue *lscq.PointerQueue // the concurrency safe FIFO queue that used to add multiplex subscriber allowNativeMessages bool shouldHandleOnlyNativeMessages bool @@ -174,17 +177,30 @@ func NewStackExchange(cfg Config, channel string) (*StackExchange, error) { subscribe: make(chan subscribeAction), unsubscribe: make(chan unsubscribeAction), - multiplexAddSubscriber: make(chan multiplexSubscriber), - wsNumPerRedisConn: cfg.WsNumPerRedisConn, - subscriberZset: zset.NewFloat64(), - subscriberSkipMap: skipmap.NewString(), + multiplexAddSubscriber: make(chan *multiplexSubscriber), + wsNumPerRedisConn: cfg.WsNumPerRedisConn, + subscriberZset: zset.NewFloat64(), + subscriberSkipMap: skipmap.NewString(), + multiplexSubscriberQueue: lscq.NewPointer(), } go exc.run() + go exc.readMultiplexSubscriber() return exc, nil } +func (exc *StackExchange) readMultiplexSubscriber() { + for { + if p, ok := exc.multiplexSubscriberQueue.Dequeue(); ok { + multiplexSub := (*multiplexSubscriber)(p) + exc.multiplexAddSubscriber <- multiplexSub + } else { + time.Sleep(time.Millisecond * 100) + } + } +} + func (exc *StackExchange) run() { for { select { @@ -353,7 +369,8 @@ func (exc *StackExchange) multiplexOnConnect(c *neffos.Conn) error { conn: c, subscriber: s, } - exc.multiplexAddSubscriber <- multiplexSub + // exc.multiplexAddSubscriber <- multiplexSub + exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) } s.mu.RUnlock() @@ -442,7 +459,8 @@ func (exc *StackExchange) multiplexOnConnect(c *neffos.Conn) error { conn: c, subscriber: s, } - exc.multiplexAddSubscriber <- multiplexSub + // exc.multiplexAddSubscriber <- multiplexSub + exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) return nil } From b43c117d2ac2c39276598336caedb65644b151b4 Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 16 Apr 2025 14:36:07 +0800 Subject: [PATCH 09/10] change subscribers to *sync.Map to make it can be store concurrency and with out a channel. That can ensure the subscriber exist in the map when event trigger --- stackexchange/redis/stackexchange_redis.go | 114 +++++++++++---------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index db9e72e..b876a1d 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -5,9 +5,7 @@ import ( "math/rand" "sync" "time" - "unsafe" - "github.com/bytedance/gopkg/collection/lscq" "github.com/bytedance/gopkg/collection/skipmap" "github.com/bytedance/gopkg/collection/zset" uuid "github.com/iris-contrib/go.uuid" @@ -48,18 +46,19 @@ type StackExchange struct { pool *radix.Pool connFunc radix.ConnFunc - subscribers map[*neffos.Conn]*subscriber + // subscribers map[*neffos.Conn]*subscriber + subscribers *sync.Map // key: *neffos.Conn, value: *subscriber - addSubscriber chan *subscriber + // addSubscriber chan *subscriber subscribe chan subscribeAction unsubscribe chan unsubscribeAction delSubscriber chan closeAction - multiplexAddSubscriber chan *multiplexSubscriber - wsNumPerRedisConn int // the limit number of websocket connections per redis connection - subscriberZset *zset.Float64Set // value: subscriberID, score: number of websocket connections - subscriberSkipMap *skipmap.StringMap // key: subscriberID, value: *subscriber - multiplexSubscriberQueue *lscq.PointerQueue // the concurrency safe FIFO queue that used to add multiplex subscriber + // multiplexAddSubscriber chan *multiplexSubscriber + wsNumPerRedisConn int // the limit number of websocket connections per redis connection + subscriberZset *zset.Float64Set // value: subscriberID, score: number of websocket connections + subscriberSkipMap *skipmap.StringMap // key: subscriberID, value: *subscriber + // multiplexSubscriberQueue *lscq.PointerQueue // the concurrency safe FIFO queue that used to add multiplex subscriber allowNativeMessages bool shouldHandleOnlyNativeMessages bool @@ -94,10 +93,10 @@ type ( namespaces []string } - multiplexSubscriber struct { - conn *neffos.Conn - subscriber *subscriber - } + // multiplexSubscriber struct { + // conn *neffos.Conn + // subscriber *subscriber + // } ) var _ neffos.StackExchange = (*StackExchange)(nil) @@ -171,46 +170,48 @@ func NewStackExchange(cfg Config, channel string) (*StackExchange, error) { // We could use multiple channels but overcomplicate things here. channel: channel, - subscribers: make(map[*neffos.Conn]*subscriber), - addSubscriber: make(chan *subscriber), + // subscribers: make(map[*neffos.Conn]*subscriber), + subscribers: &sync.Map{}, + // addSubscriber: make(chan *subscriber), delSubscriber: make(chan closeAction), subscribe: make(chan subscribeAction), unsubscribe: make(chan unsubscribeAction), - multiplexAddSubscriber: make(chan *multiplexSubscriber), - wsNumPerRedisConn: cfg.WsNumPerRedisConn, - subscriberZset: zset.NewFloat64(), - subscriberSkipMap: skipmap.NewString(), - multiplexSubscriberQueue: lscq.NewPointer(), + // multiplexAddSubscriber: make(chan *multiplexSubscriber), + wsNumPerRedisConn: cfg.WsNumPerRedisConn, + subscriberZset: zset.NewFloat64(), + subscriberSkipMap: skipmap.NewString(), + // multiplexSubscriberQueue: lscq.NewPointer(), } go exc.run() - go exc.readMultiplexSubscriber() + // go exc.readMultiplexSubscriber() return exc, nil } -func (exc *StackExchange) readMultiplexSubscriber() { - for { - if p, ok := exc.multiplexSubscriberQueue.Dequeue(); ok { - multiplexSub := (*multiplexSubscriber)(p) - exc.multiplexAddSubscriber <- multiplexSub - } else { - time.Sleep(time.Millisecond * 100) - } - } -} +// func (exc *StackExchange) readMultiplexSubscriber() { +// for { +// if p, ok := exc.multiplexSubscriberQueue.Dequeue(); ok { +// multiplexSub := (*multiplexSubscriber)(p) +// exc.multiplexAddSubscriber <- multiplexSub +// } else { +// time.Sleep(time.Millisecond * 100) +// } +// } +// } func (exc *StackExchange) run() { for { select { - case s := <-exc.addSubscriber: - exc.subscribers[s.conn] = s - // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID()) - case s := <-exc.multiplexAddSubscriber: - exc.subscribers[s.conn] = s.subscriber + // case s := <-exc.addSubscriber: + // exc.subscribers[s.conn] = s + // // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID()) + // case s := <-exc.multiplexAddSubscriber: + // exc.subscribers[s.conn] = s.subscriber case m := <-exc.subscribe: - if sub, ok := exc.subscribers[m.conn]; ok { + if sub, ok := exc.subscribers.Load(m.conn); ok { + sub := sub.(*subscriber) if !sub.isMultiplex { channel := exc.getChannel(m.namespace, "", "") sub.pubSub.PSubscribe(sub.msgCh, channel) @@ -228,7 +229,8 @@ func (exc *StackExchange) run() { } } case m := <-exc.unsubscribe: - if sub, ok := exc.subscribers[m.conn]; ok { + if sub, ok := exc.subscribers.Load(m.conn); ok { + sub := sub.(*subscriber) if !sub.isMultiplex { channel := exc.getChannel(m.namespace, "", "") // neffos.Debugf("[%s] unsubscribed from [%s]", channel) @@ -245,12 +247,14 @@ func (exc *StackExchange) run() { } } case m := <-exc.delSubscriber: - if sub, ok := exc.subscribers[m.conn]; ok { + if sub, ok := exc.subscribers.Load(m.conn); ok { + sub := sub.(*subscriber) if !sub.isMultiplex { // neffos.Debugf("[%s] disconnected", m.conn.ID()) sub.pubSub.Close() close(sub.msgCh) - delete(exc.subscribers, m.conn) + // delete(exc.subscribers, m.conn) + exc.subscribers.Delete(m.conn) } else { sub.mu.Lock() sub.conns.Delete(m.conn.ID()) @@ -277,7 +281,8 @@ func (exc *StackExchange) run() { channel := exc.getChannel("", "", m.conn.ID()) sub.pubSub.PUnsubscribe(sub.msgCh, channel) } - delete(exc.subscribers, m.conn) + // delete(exc.subscribers, m.conn) + exc.subscribers.Delete(m.conn) sub.mu.Unlock() } } @@ -343,7 +348,8 @@ func (exc *StackExchange) onConnect(c *neffos.Conn) error { selfChannel := exc.getChannel("", "", c.ID()) pubSub.PSubscribe(redisMsgCh, selfChannel) - exc.addSubscriber <- s + // exc.addSubscriber <- s + exc.subscribers.Store(c, s) return nil } @@ -365,12 +371,13 @@ func (exc *StackExchange) multiplexOnConnect(c *neffos.Conn) error { s.conns.Store(c.ID(), c) exc.subscriberZset.IncrBy(1, s.id) - multiplexSub := multiplexSubscriber{ - conn: c, - subscriber: s, - } + // multiplexSub := multiplexSubscriber{ + // conn: c, + // subscriber: s, + // } // exc.multiplexAddSubscriber <- multiplexSub - exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) + // exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) + exc.subscribers.Store(c, s) } s.mu.RUnlock() @@ -455,12 +462,13 @@ func (exc *StackExchange) multiplexOnConnect(c *neffos.Conn) error { exc.subscriberZset.IncrBy(1, s.id) } - multiplexSub := multiplexSubscriber{ - conn: c, - subscriber: s, - } - // exc.multiplexAddSubscriber <- multiplexSub - exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) + // multiplexSub := multiplexSubscriber{ + // conn: c, + // subscriber: s, + // } + // // exc.multiplexAddSubscriber <- multiplexSub + // exc.multiplexSubscriberQueue.Enqueue(unsafe.Pointer(&multiplexSub)) + exc.subscribers.Store(c, s) return nil } From 4f6e06f7844430d92a83b17b673416af4e50dc7e Mon Sep 17 00:00:00 2001 From: dolookl <346676303@qq.com> Date: Wed, 16 Apr 2025 14:54:14 +0800 Subject: [PATCH 10/10] don't unsubscribe the namespace when use multiplex subscriber, --- stackexchange/redis/stackexchange_redis.go | 38 ++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index b876a1d..33b9286 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -236,12 +236,19 @@ func (exc *StackExchange) run() { // neffos.Debugf("[%s] unsubscribed from [%s]", channel) sub.pubSub.PUnsubscribe(sub.msgCh, channel) } else { - count := sub.subscribedNs[m.namespace] - if count < 2 { - delete(sub.subscribedNs, m.namespace) - channel := exc.getChannel(m.namespace, "", "") - sub.pubSub.PUnsubscribe(sub.msgCh, channel) - } else { + // count := sub.subscribedNs[m.namespace] + // if count < 2 { + // delete(sub.subscribedNs, m.namespace) + // channel := exc.getChannel(m.namespace, "", "") + // sub.pubSub.PUnsubscribe(sub.msgCh, channel) + // } else { + // sub.subscribedNs[m.namespace] = count - 1 + // } + + // when use multiplex subscriber, don't unsubscribe the namespace + // because in concurrency situation, the unsubscribe may happens before the subscribe + // that will make the conn use a subscriber that unsubscriber the namespace + if count, has := sub.subscribedNs[m.namespace]; has { sub.subscribedNs[m.namespace] = count - 1 } } @@ -261,13 +268,18 @@ func (exc *StackExchange) run() { for _, ns := range m.namespaces { if count, has := sub.subscribedNs[ns]; has { - if count < 2 { - delete(sub.subscribedNs, ns) - channel := exc.getChannel(ns, "", "") - sub.pubSub.PUnsubscribe(sub.msgCh, channel) - } else { - sub.subscribedNs[ns] = count - 1 - } + // if count < 2 { + // delete(sub.subscribedNs, ns) + // channel := exc.getChannel(ns, "", "") + // sub.pubSub.PUnsubscribe(sub.msgCh, channel) + // } else { + // sub.subscribedNs[ns] = count - 1 + // } + + // when use multiplex subscriber, don't unsubscribe the namespace + // because in concurrency situation, the unsubscribe may happens before the subscribe + // that will make the conn use a subscriber that unsubscriber the namespace + sub.subscribedNs[ns] = count - 1 } }