Skip to content

Commit 6656d92

Browse files
authored
fix: refactor friend request and group request handling by removing l… (#960)
* fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: refactor friend request and group request handling by removing local caching to relieve synchronization performance pressure. Signed-off-by: Gordon <[email protected]> * fix: group member sync and unread trigger. Signed-off-by: Gordon <[email protected]> * fix: message sync logic restore. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 04b02fe commit 6656d92

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+455
-2134
lines changed

go.mod

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,25 @@ require golang.org/x/net v0.39.0 // indirect
1818

1919
require (
2020
github.com/google/go-cmp v0.6.0
21-
github.com/openimsdk/protocol v0.0.73-alpha.6
21+
github.com/hashicorp/golang-lru/v2 v2.0.7
22+
github.com/openimsdk/protocol v0.0.73-alpha.12
2223
github.com/openimsdk/tools v0.0.50-alpha.80
2324
github.com/patrickmn/go-cache v2.1.0+incompatible
24-
github.com/stretchr/testify v1.9.0
2525
golang.org/x/image v0.26.0
2626
golang.org/x/sync v0.13.0
2727
gorm.io/gorm v1.25.10
2828
)
2929

3030
require (
31-
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3231
github.com/jinzhu/inflection v1.0.0 // indirect
3332
github.com/jinzhu/now v1.1.5 // indirect
3433
github.com/lestrrat-go/strftime v1.0.6 // indirect
3534
github.com/mattn/go-sqlite3 v1.14.22 // indirect
36-
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3735
go.uber.org/atomic v1.7.0 // indirect
3836
go.uber.org/multierr v1.6.0 // indirect
3937
go.uber.org/zap v1.24.0 // indirect
4038
golang.org/x/sys v0.32.0 // indirect
4139
golang.org/x/text v0.24.0 // indirect
4240
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
4341
google.golang.org/grpc v1.68.0 // indirect
44-
gopkg.in/yaml.v3 v3.0.1 // indirect
4542
)

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1212
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1313
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
1414
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
15+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
16+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
1517
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
1618
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
1719
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
@@ -28,8 +30,8 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
2830
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
2931
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
3032
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
31-
github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc=
32-
github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
33+
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
34+
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
3335
github.com/openimsdk/tools v0.0.50-alpha.80 h1:Nvt97Vm85CXr633Jf7WjRJeL2nxJJjwlZJFDgWWXkJU=
3436
github.com/openimsdk/tools v0.0.50-alpha.80/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
3537
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
@@ -67,8 +69,6 @@ google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
6769
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
6870
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
6971
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
70-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
71-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
7272
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
7373
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
7474
gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E=

internal/conversation_msg/conversation_msg.go

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"math"
9-
"runtime/debug"
109
"sync"
1110

1211
"github.com/openimsdk/openim-sdk-core/v3/pkg/api"
@@ -57,7 +56,7 @@ type Conversation struct {
5756
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
5857
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
5958
msgSyncerCh chan common.Cmd2Value
60-
conversationEventQueue *common.EventQueue
59+
conversationEventQueue chan common.Cmd2Value
6160
loginUserID string
6261
platform int32
6362
DataDir string
@@ -80,7 +79,7 @@ type Conversation struct {
8079
typing *typing
8180
}
8281

83-
func (c *Conversation) ConversationEventQueue() *common.EventQueue {
82+
func (c *Conversation) ConversationEventQueue() chan common.Cmd2Value {
8483
return c.conversationEventQueue
8584
}
8685

@@ -114,7 +113,7 @@ func (c *Conversation) SetBusinessListener(businessListener func() open_im_sdk_c
114113

115114
func NewConversation(
116115
longConnMgr *interaction.LongConnMgr,
117-
msgSyncerCh chan common.Cmd2Value, conversationEventQueue *common.EventQueue,
116+
msgSyncerCh chan common.Cmd2Value, conversationEventQueue chan common.Cmd2Value,
118117
relation *relation.Relation,
119118
group *group.Group, user *user.User,
120119
file *file.File) *Conversation {
@@ -209,6 +208,10 @@ func (c *Conversation) initSyncer() {
209208

210209
}
211210

211+
func (c *Conversation) GetCh() chan common.Cmd2Value {
212+
return c.conversationEventQueue
213+
}
214+
212215
type onlineMsgKey struct {
213216
ClientMsgID string
214217
ServerMsgID string
@@ -482,15 +485,13 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
482485
c.msgOffset += msgLen
483486
total := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Total
484487

485-
insertMsg := make(map[string][2][]*model_struct.LocalChatLog, 10)
488+
insertMsg := make(map[string][]*model_struct.LocalChatLog, 10)
486489
conversationList := make([]*model_struct.LocalConversation, 0)
487490
var exceptionMsg []*model_struct.LocalChatLog
488491

489492
log.ZDebug(ctx, "message come here conversation ch in reinstalled", "conversation length", msgLen)
490493
b := time.Now()
491494

492-
groupMemberMap := make(map[string][]string, 10)
493-
494495
for conversationID, msgs := range allMsg {
495496
log.ZDebug(ctx, "parse message in one conversation", "conversationID",
496497
conversationID, "message length", len(msgs.Msgs))
@@ -501,9 +502,6 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
501502
continue
502503
}
503504
for _, v := range msgs.Msgs {
504-
if v.SessionType == constant.ReadGroupChatType {
505-
groupMemberMap[v.GroupID] = append(groupMemberMap[v.GroupID], v.SendID)
506-
}
507505

508506
log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v)
509507
msg := &sdk_struct.MsgStruct{}
@@ -560,26 +558,11 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
560558
log.ZWarn(ctx, "latestMsg is nil", errs.New("latestMsg is nil"), "conversationID", conversationID)
561559
}
562560

563-
insertMsg[conversationID] = [2][]*model_struct.LocalChatLog{
564-
append(insertMessage, selfInsertMessage...),
565-
othersInsertMessage,
566-
}
567-
568-
}
569-
b1 := time.Now()
570-
// Synchronize the group members for this batch of messages
571-
groupIDs := datautil.Keys(groupMemberMap)
572-
err := c.group.IncrSyncGroupAndMember(ctx, groupIDs...)
573-
if err != nil {
574-
log.ZError(ctx, "IncrSyncGroupAndMember", err)
561+
insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...)
575562
}
576-
log.ZDebug(ctx, "IncrSyncGroupAndMember", "cost time", time.Since(b1).Seconds(), "len", len(allMsg))
577-
b2 := time.Now()
578-
// Use the latest group member or user information to fill in the profile pictures and nicknames of the messages
579-
mergedInsertMsg := c.FillSenderProfileBatch(ctx, insertMsg)
580-
log.ZDebug(ctx, "FillSenderProfileBatch", "cost time", time.Since(b2).Seconds(), "len", len(allMsg))
563+
581564
// message storage
582-
_ = c.batchInsertMessageList(ctx, mergedInsertMsg)
565+
_ = c.batchInsertMessageList(ctx, insertMsg)
583566

584567
// conversation storage
585568
if err := c.db.BatchUpdateConversationList(ctx, conversationList); err != nil {
@@ -588,9 +571,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
588571
log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
589572

590573
// log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
591-
if total > 0 {
592-
c.ConversationListener().OnSyncServerProgress((c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
593-
}
574+
c.ConversationListener().OnSyncServerProgress((c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
594575
//Exception message storage
595576
for _, v := range exceptionMsg {
596577
log.ZWarn(ctx, "exceptionMsg show: ", nil, "msg", *v)
@@ -945,25 +926,3 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string)
945926
}
946927
return userInfo.FaceURL, userInfo.Nickname, nil
947928
}
948-
949-
func (c *Conversation) ConsumeConversationEventLoop(ctx context.Context) {
950-
defer func() {
951-
if r := recover(); r != nil {
952-
err := fmt.Sprintf("panic: %+v\n%s", r, debug.Stack())
953-
log.ZWarn(ctx, "DoListener panic", nil, "panic info", err)
954-
}
955-
}()
956-
c.conversationEventQueue.ConsumeLoop(ctx, func(ctx context.Context, event *common.Event) {
957-
cmd, ok := event.Data.(common.Cmd2Value)
958-
if !ok {
959-
log.ZWarn(ctx, "invalid event data in conversationEventQueue", nil)
960-
return
961-
}
962-
963-
log.ZInfo(cmd.Ctx, "recv cmd", "caller", cmd.Caller, "cmd", cmd.Cmd, "value", cmd.Value)
964-
c.Work(cmd)
965-
log.ZInfo(cmd.Ctx, "done cmd", "caller", cmd.Caller, "cmd", cmd.Cmd, "value", cmd.Value)
966-
}, func(msg string, fields ...any) {
967-
log.ZError(ctx, msg, nil, fields...)
968-
})
969-
}

internal/conversation_msg/message_check.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -634,56 +634,3 @@ func (c *Conversation) groupHandle(ctx context.Context, self, others []*model_st
634634
}
635635

636636
}
637-
638-
func (c *Conversation) FillSenderProfileBatch(ctx context.Context, insertMsg map[string][2][]*model_struct.LocalChatLog) map[string][]*model_struct.LocalChatLog {
639-
if len(insertMsg) == 0 {
640-
return map[string][]*model_struct.LocalChatLog{}
641-
}
642-
643-
conversationIDs := datautil.Keys(insertMsg)
644-
645-
conversations, err := c.db.GetMultipleConversationDB(ctx, conversationIDs)
646-
if err != nil {
647-
log.ZError(ctx, "GetMultipleConversationDB failed", err)
648-
return mergeInsertMsg(insertMsg)
649-
}
650-
651-
conversationMap := datautil.SliceToMap(conversations, func(c *model_struct.LocalConversation) string {
652-
return c.ConversationID
653-
})
654-
655-
userInfo, err := c.db.GetLoginUser(ctx, c.loginUserID)
656-
if err != nil {
657-
log.ZError(ctx, "GetLoginUser failed", err)
658-
return mergeInsertMsg(insertMsg)
659-
}
660-
661-
merged := make(map[string][]*model_struct.LocalChatLog, len(insertMsg))
662-
for conversationID, pair := range insertMsg {
663-
if len(pair[0]) == 0 && len(pair[1]) == 0 {
664-
continue
665-
}
666-
lc, ok := conversationMap[conversationID]
667-
if !ok {
668-
merged[conversationID] = append(pair[0], pair[1]...)
669-
continue
670-
}
671-
switch lc.ConversationType {
672-
case constant.SingleChatType:
673-
c.singleHandle(ctx, pair[0], pair[1], lc, userInfo)
674-
case constant.ReadGroupChatType:
675-
c.groupHandle(ctx, pair[0], pair[1], lc)
676-
}
677-
merged[conversationID] = append(pair[0], pair[1]...)
678-
}
679-
680-
return merged
681-
}
682-
683-
func mergeInsertMsg(input map[string][2][]*model_struct.LocalChatLog) map[string][]*model_struct.LocalChatLog {
684-
result := make(map[string][]*model_struct.LocalChatLog, len(input))
685-
for convID, pair := range input {
686-
result[convID] = append(pair[0], pair[1]...)
687-
}
688-
return result
689-
}

internal/conversation_msg/notification.go

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,26 +67,23 @@ func (c *Conversation) Work(c2v common.Cmd2Value) {
6767
func (c *Conversation) syncFlag(c2v common.Cmd2Value) {
6868
ctx := c2v.Ctx
6969
syncFlag := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).SyncFlag
70-
seqs := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).Seqs
7170
switch syncFlag {
72-
case constant.AppDataSyncBegin, constant.LargeDataSyncBegin:
73-
log.ZDebug(ctx, "AppDataSyncBegin")
74-
c.seqs = seqs
71+
case constant.AppDataSyncStart:
72+
log.ZDebug(ctx, "AppDataSyncStart")
7573
c.startTime = time.Now()
7674
c.ConversationListener().OnSyncServerStart(true)
7775
c.ConversationListener().OnSyncServerProgress(1)
7876
asyncWaitFunctions := []func(c context.Context) error{
79-
c.group.IncrSyncJoinGroup,
77+
c.group.SyncAllJoinedGroupsAndMembersWithLock,
8078
c.relation.IncrSyncFriends,
8179
}
8280
runSyncFunctions(ctx, asyncWaitFunctions, asyncWait)
8381
c.addInitProgress(InitSyncProgress * 4 / 10) // add 40% of InitSyncProgress as progress
8482
c.ConversationListener().OnSyncServerProgress(c.progress) // notify server current Progress
8583

8684
syncWaitFunctions := []func(c context.Context) error{
87-
8885
c.IncrSyncConversations,
89-
c.SyncAllConUnreadAndCreateNewCon,
86+
c.SyncAllConversationHashReadSeqs,
9087
}
9188
runSyncFunctions(ctx, syncWaitFunctions, syncWait)
9289
log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds())
@@ -96,20 +93,15 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) {
9693
asyncNoWaitFunctions := []func(c context.Context) error{
9794
c.user.SyncLoginUserInfoWithoutNotice,
9895
c.relation.SyncAllBlackListWithoutNotice,
99-
c.relation.SyncAllFriendApplicationWithoutNotice,
100-
c.relation.SyncAllSelfFriendApplicationWithoutNotice,
101-
c.group.SyncAllAdminGroupApplicationWithoutNotice,
102-
c.group.SyncAllSelfGroupApplicationWithoutNotice,
10396
}
10497
runSyncFunctions(ctx, asyncNoWaitFunctions, asyncNoWait)
10598

106-
case constant.AppDataSyncEnd, constant.LargeDataSyncEnd:
107-
log.ZDebug(ctx, "AppDataSyncEnd", "time", time.Since(c.startTime).Milliseconds())
99+
case constant.AppDataSyncFinish:
100+
log.ZDebug(ctx, "AppDataSyncFinish", "time", time.Since(c.startTime).Milliseconds())
108101
c.progress = 100
109102
c.ConversationListener().OnSyncServerProgress(c.progress)
110103
c.ConversationListener().OnSyncServerFinish(true)
111104
case constant.MsgSyncBegin:
112-
c.seqs = seqs
113105
log.ZDebug(ctx, "MsgSyncBegin")
114106
c.ConversationListener().OnSyncServerStart(false)
115107
c.syncData(c2v)
@@ -425,7 +417,7 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) {
425417

426418
// Synchronous sync functions
427419
syncFuncs := []func(c context.Context) error{
428-
c.SyncAllConUnreadAndCreateNewCon,
420+
c.SyncAllConversationHashReadSeqs,
429421
}
430422

431423
runSyncFunctions(ctx, syncFuncs, syncWait)
@@ -434,11 +426,7 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) {
434426
asyncFuncs := []func(c context.Context) error{
435427
c.user.SyncLoginUserInfo,
436428
c.relation.SyncAllBlackList,
437-
c.relation.SyncAllFriendApplication,
438-
c.relation.SyncAllSelfFriendApplication,
439-
c.group.SyncAllAdminGroupApplication,
440-
c.group.SyncAllSelfGroupApplication,
441-
c.group.IncrSyncJoinGroupWithLock,
429+
c.group.SyncAllJoinedGroupsAndMembersWithLock,
442430
c.relation.IncrSyncFriendsWithLock,
443431
c.IncrSyncConversationsWithLock,
444432
}

0 commit comments

Comments
 (0)