diff --git a/core/files/fileobject/migration.go b/core/files/fileobject/migration.go index 743232df01..ac8393cede 100644 --- a/core/files/fileobject/migration.go +++ b/core/files/fileobject/migration.go @@ -191,7 +191,7 @@ func (s *service) migrateDeriveObject(ctx context.Context, space clientspace.Spa err = nil } - err = s.addToSyncQueue(id, fullFileId, false, false) + err = s.addToSyncQueue(id, fullFileId, false, false, "", 0) if err != nil { return fmt.Errorf("add to sync queue: %w", err) } diff --git a/core/files/fileobject/service.go b/core/files/fileobject/service.go index ee17cfaa9b..a49a4055f8 100644 --- a/core/files/fileobject/service.go +++ b/core/files/fileobject/service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sort" "sync" "time" @@ -34,6 +35,7 @@ import ( "github.com/anyproto/anytype-heart/pkg/lib/datastore/anystoreprovider" "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" "github.com/anyproto/anytype-heart/pkg/lib/logging" + "github.com/anyproto/anytype-heart/pkg/lib/mill" "github.com/anyproto/anytype-heart/pkg/lib/pb/model" "github.com/anyproto/anytype-heart/space" "github.com/anyproto/anytype-heart/space/clientspace" @@ -146,6 +148,7 @@ func (s *service) Init(a *app.App) error { migrationQueueStore, log.Desugar(), s.migrationQueueHandler, + nil, persistentqueue.WithContext(migrationQueueCtx), ) return nil @@ -233,7 +236,7 @@ func (s *service) ensureNotSyncedFilesAddedToQueue() error { fullId := extractFullFileIdFromDetails(record.Details) if record.Details.GetString(bundle.RelationKeyCreator) == s.accountService.MyParticipantId(fullId.SpaceId) { id := record.Details.GetString(bundle.RelationKeyId) - err := s.addToSyncQueue(id, fullId, false, false) + err := s.addToSyncQueue(id, fullId, false, false, "", 0) if err != nil { log.Errorf("add to sync queue: %v", err) } @@ -260,7 +263,7 @@ func (s *service) EnsureFileAddedToSyncQueue(id domain.FullID, details *domain.D SpaceId: id.SpaceID, FileId: domain.FileId(details.GetString(bundle.RelationKeyFileId)), } - err := s.addToSyncQueue(id.ObjectID, fullId, false, false) + err := s.addToSyncQueue(id.ObjectID, fullId, false, false, "", 0) return err } @@ -285,6 +288,11 @@ func (s *service) InitEmptyFileState(st *state.State) { fileblocks.InitEmptyFileState(st) } +type imageVariant struct { + variantId domain.FileId + size int64 +} + func (s *service) Create(ctx context.Context, spaceId string, req filemodels.CreateRequest) (id string, object *domain.Details, err error) { if !req.AsyncMetadataIndexing && len(req.FileVariants) == 0 { return "", nil, fmt.Errorf("file variants are not provided") @@ -299,7 +307,28 @@ func (s *service) Create(ctx context.Context, spaceId string, req filemodels.Cre if err != nil { return "", nil, fmt.Errorf("create in space: %w", err) } - err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported()) + + var imageVariants []imageVariant + for _, variant := range req.FileVariants { + if variant.Mill == mill.ImageResizeId { + imageVariants = append(imageVariants, imageVariant{ + variantId: domain.FileId(variant.Hash), + size: variant.Size_, + }) + } + } + sort.Slice(imageVariants, func(i, j int) bool { + return imageVariants[i].size < imageVariants[j].size + }) + for idx, variant := range imageVariants { + priority := len(imageVariants) - idx + err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported(), variant.variantId, priority) + if err != nil { + return "", nil, fmt.Errorf("add image variant to sync queue: %w", err) + } + } + + err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported(), "", 0) if err != nil { return "", nil, fmt.Errorf("add to sync queue: %w", err) } @@ -423,8 +452,8 @@ func (s *service) CreateFromImport(fileId domain.FullFileId, origin objectorigin return fileObjectId, nil } -func (s *service) addToSyncQueue(objectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool) error { - if err := s.fileSync.AddFile(objectId, fileId, uploadedByUser, imported); err != nil { +func (s *service) addToSyncQueue(objectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool, prioritizeVariantId domain.FileId, priority int) error { + if err := s.fileSync.AddFile(objectId, fileId, uploadedByUser, imported, prioritizeVariantId, priority); err != nil { return fmt.Errorf("add file to sync queue: %w", err) } return nil diff --git a/core/files/files_test.go b/core/files/files_test.go index b9138fd8d3..d83991e852 100644 --- a/core/files/files_test.go +++ b/core/files/files_test.go @@ -129,7 +129,7 @@ func TestFileAdd(t *testing.T) { }) t.Run("check that file is uploaded to backup node", func(t *testing.T) { - err := fx.fileSyncService.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: got.FileId}, true, false) + err := fx.fileSyncService.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: got.FileId}, true, false, "", 0) require.NoError(t, err) <-uploaded infos, err := fx.rpcStore.FilesInfo(ctx, spaceId, got.FileId) diff --git a/core/files/filesync/filesync.go b/core/files/filesync/filesync.go index 5d231c8c46..ee038f987b 100644 --- a/core/files/filesync/filesync.go +++ b/core/files/filesync/filesync.go @@ -36,7 +36,7 @@ type LimitCallback func(fileObjectId string, fileId domain.FullFileId, bytesLeft type DeleteCallback func(fileObjectId domain.FullFileId) type FileSync interface { - AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error) + AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool, prioritizeVariantId domain.FileId, score int) (err error) UploadSynchronously(ctx context.Context, spaceId string, fileId domain.FileId) error OnUploadStarted(StatusCallback) OnUploaded(StatusCallback) @@ -119,25 +119,25 @@ func (s *fileSync) Init(a *app.App) (err error) { if err != nil { return fmt.Errorf("init uploading queue storage: %w", err) } - s.uploadingQueue = persistentqueue.New(uploadingQueueStorage, log.Logger, s.uploadingHandler) + s.uploadingQueue = persistentqueue.New(uploadingQueueStorage, log.Logger, s.uploadingHandler, queueItemLess) retryUploadingQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/retry_uploading", makeQueueItem) if err != nil { return fmt.Errorf("init retry uploading queue storage: %w", err) } - s.retryUploadingQueue = persistentqueue.New(retryUploadingQueueStorage, log.Logger, s.retryingHandler, persistentqueue.WithRetryPause(loopTimeout)) + s.retryUploadingQueue = persistentqueue.New(retryUploadingQueueStorage, log.Logger, s.retryingHandler, queueItemLess, persistentqueue.WithRetryPause(loopTimeout)) deletionQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/deletion", makeDeletionQueueItem) if err != nil { return fmt.Errorf("init deletion queue storage: %w", err) } - s.deletionQueue = persistentqueue.New(deletionQueueStorage, log.Logger, s.deletionHandler) + s.deletionQueue = persistentqueue.New(deletionQueueStorage, log.Logger, s.deletionHandler, nil) retryDeletionQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/retry_deletion", makeDeletionQueueItem) if err != nil { return fmt.Errorf("init retry deletion queue storage: %w", err) } - s.retryDeletionQueue = persistentqueue.New(retryDeletionQueueStorage, log.Logger, s.retryDeletionHandler, persistentqueue.WithRetryPause(loopTimeout)) + s.retryDeletionQueue = persistentqueue.New(retryDeletionQueueStorage, log.Logger, s.retryDeletionHandler, nil, persistentqueue.WithRetryPause(loopTimeout)) return } diff --git a/core/files/filesync/mock_filesync/mock_FileSync.go b/core/files/filesync/mock_filesync/mock_FileSync.go index 810d84a09a..b3716f01e1 100644 --- a/core/files/filesync/mock_filesync/mock_FileSync.go +++ b/core/files/filesync/mock_filesync/mock_FileSync.go @@ -29,17 +29,17 @@ func (_m *MockFileSync) EXPECT() *MockFileSync_Expecter { return &MockFileSync_Expecter{mock: &_m.Mock} } -// AddFile provides a mock function with given fields: fileObjectId, fileId, uploadedByUser, imported -func (_m *MockFileSync) AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool) error { - ret := _m.Called(fileObjectId, fileId, uploadedByUser, imported) +// AddFile provides a mock function with given fields: fileObjectId, fileId, uploadedByUser, imported, prioritizeVariantId, score +func (_m *MockFileSync) AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool, prioritizeVariantId domain.FileId, score int) error { + ret := _m.Called(fileObjectId, fileId, uploadedByUser, imported, prioritizeVariantId, score) if len(ret) == 0 { panic("no return value specified for AddFile") } var r0 error - if rf, ok := ret.Get(0).(func(string, domain.FullFileId, bool, bool) error); ok { - r0 = rf(fileObjectId, fileId, uploadedByUser, imported) + if rf, ok := ret.Get(0).(func(string, domain.FullFileId, bool, bool, domain.FileId, int) error); ok { + r0 = rf(fileObjectId, fileId, uploadedByUser, imported, prioritizeVariantId, score) } else { r0 = ret.Error(0) } @@ -57,13 +57,15 @@ type MockFileSync_AddFile_Call struct { // - fileId domain.FullFileId // - uploadedByUser bool // - imported bool -func (_e *MockFileSync_Expecter) AddFile(fileObjectId interface{}, fileId interface{}, uploadedByUser interface{}, imported interface{}) *MockFileSync_AddFile_Call { - return &MockFileSync_AddFile_Call{Call: _e.mock.On("AddFile", fileObjectId, fileId, uploadedByUser, imported)} +// - prioritizeVariantId domain.FileId +// - score int +func (_e *MockFileSync_Expecter) AddFile(fileObjectId interface{}, fileId interface{}, uploadedByUser interface{}, imported interface{}, prioritizeVariantId interface{}, score interface{}) *MockFileSync_AddFile_Call { + return &MockFileSync_AddFile_Call{Call: _e.mock.On("AddFile", fileObjectId, fileId, uploadedByUser, imported, prioritizeVariantId, score)} } -func (_c *MockFileSync_AddFile_Call) Run(run func(fileObjectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool)) *MockFileSync_AddFile_Call { +func (_c *MockFileSync_AddFile_Call) Run(run func(fileObjectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool, prioritizeVariantId domain.FileId, score int)) *MockFileSync_AddFile_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(domain.FullFileId), args[2].(bool), args[3].(bool)) + run(args[0].(string), args[1].(domain.FullFileId), args[2].(bool), args[3].(bool), args[4].(domain.FileId), args[5].(int)) }) return _c } @@ -73,7 +75,7 @@ func (_c *MockFileSync_AddFile_Call) Return(err error) *MockFileSync_AddFile_Cal return _c } -func (_c *MockFileSync_AddFile_Call) RunAndReturn(run func(string, domain.FullFileId, bool, bool) error) *MockFileSync_AddFile_Call { +func (_c *MockFileSync_AddFile_Call) RunAndReturn(run func(string, domain.FullFileId, bool, bool, domain.FileId, int) error) *MockFileSync_AddFile_Call { _c.Call.Return(run) return _c } diff --git a/core/files/filesync/queue.go b/core/files/filesync/queue.go index 54d070c120..eb2ff93849 100644 --- a/core/files/filesync/queue.go +++ b/core/files/filesync/queue.go @@ -20,6 +20,11 @@ type QueueItem struct { Timestamp int64 AddedByUser bool Imported bool + + // VariantId tells uploader to upload specific branch of file tree + VariantId domain.FileId + // Score affects priority + Score int } func (it *QueueItem) Validate() error { @@ -33,6 +38,9 @@ func (it *QueueItem) Validate() error { } func (it *QueueItem) Key() string { + if it.VariantId != "" { + return it.ObjectId + "/" + it.VariantId.String() + } return it.ObjectId } @@ -43,6 +51,9 @@ func (it *QueueItem) FullFileId() domain.FullFileId { } } -func (it *QueueItem) Less(other *QueueItem) bool { - return it.Timestamp < other.Timestamp +func queueItemLess(one, other *QueueItem) bool { + if one.Score != other.Score { + return one.Score > other.Score + } + return one.Timestamp < other.Timestamp } diff --git a/core/files/filesync/upload.go b/core/files/filesync/upload.go index dbfd83ed84..ba3c29a8c3 100644 --- a/core/files/filesync/upload.go +++ b/core/files/filesync/upload.go @@ -25,7 +25,7 @@ import ( "github.com/anyproto/anytype-heart/util/persistentqueue" ) -func (s *fileSync) AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error) { +func (s *fileSync) AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool, prioritizeVariantId domain.FileId, score int) (err error) { it := &QueueItem{ ObjectId: fileObjectId, SpaceId: fileId.SpaceId, @@ -33,6 +33,8 @@ func (s *fileSync) AddFile(fileObjectId string, fileId domain.FullFileId, upload AddedByUser: uploadedByUser, Imported: imported, Timestamp: time.Now().UnixMilli(), + VariantId: prioritizeVariantId, + Score: score, } err = it.Validate() if err != nil { @@ -98,7 +100,7 @@ func (s *fileSync) handleLimitReachedError(err error, it *QueueItem) *errLimitRe func (s *fileSync) uploadingHandler(ctx context.Context, it *QueueItem) (persistentqueue.Action, error) { spaceId, fileId := it.SpaceId, it.FileId - err := s.uploadFile(ctx, spaceId, fileId, it.ObjectId) + err := s.uploadFile(ctx, it) if errors.Is(err, context.Canceled) { return persistentqueue.ActionRetry, nil } @@ -124,16 +126,22 @@ func (s *fileSync) uploadingHandler(ctx context.Context, it *QueueItem) (persist return s.addToRetryUploadingQueue(it), nil } - err = s.runOnUploadedHook(it.ObjectId, it.FullFileId()) - if isObjectDeletedError(err) { - return persistentqueue.ActionDone, s.DeleteFile(it.ObjectId, it.FullFileId()) - } - if err != nil { - return s.addToRetryUploadingQueue(it), err + // Mark as uploaded only if the root of the file tree is uploaded. + if it.VariantId == "" { + err = s.runOnUploadedHook(it.ObjectId, it.FullFileId()) + if isObjectDeletedError(err) { + return persistentqueue.ActionDone, s.DeleteFile(it.ObjectId, it.FullFileId()) + } + if err != nil { + return s.addToRetryUploadingQueue(it), err + } + + s.updateSpaceUsageInformation(spaceId) + return persistentqueue.ActionDone, s.removeFromUploadingQueues(it.ObjectId) } - s.updateSpaceUsageInformation(spaceId) - return persistentqueue.ActionDone, s.removeFromUploadingQueues(it.ObjectId) + s.updateSpaceUsageInformation(spaceId) + return persistentqueue.ActionDone, nil } func (s *fileSync) addToRetryUploadingQueue(it *QueueItem) persistentqueue.Action { @@ -147,7 +155,7 @@ func (s *fileSync) addToRetryUploadingQueue(it *QueueItem) persistentqueue.Actio func (s *fileSync) retryingHandler(ctx context.Context, it *QueueItem) (persistentqueue.Action, error) { spaceId, fileId := it.SpaceId, it.FileId - err := s.uploadFile(ctx, spaceId, fileId, it.ObjectId) + err := s.uploadFile(ctx, it) if errors.Is(err, context.Canceled) { return persistentqueue.ActionRetry, nil } @@ -207,7 +215,7 @@ func (s *fileSync) removeFromUploadingQueues(objectId string) error { func (s *fileSync) UploadSynchronously(ctx context.Context, spaceId string, fileId domain.FileId) error { // TODO After we migrate to storing invites as file objects in tech space, we should update their sync status // via OnUploadStarted and OnUploaded callbacks - err := s.uploadFile(ctx, spaceId, fileId, "") + err := s.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) if err != nil { return err } @@ -275,24 +283,29 @@ func (e *errLimitReached) Error() string { return "file upload limit has been reached" } -func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain.FileId, objectId string) error { +func (s *fileSync) uploadFile(ctx context.Context, it *QueueItem) error { ctx = filestorage.ContextWithDoNotCache(ctx) - log.Debug("uploading file", zap.String("fileId", fileId.String())) + log.Debug("uploading file", zap.String("fileId", it.FileId.String())) + + branchToUpload := it.FileId + if it.VariantId != "" { + branchToUpload = it.VariantId + } - blocksAvailability, err := s.blocksAvailabilityCache.Get(context.Background(), fileId.String()) + blocksAvailability, err := s.blocksAvailabilityCache.Get(context.Background(), it.FileId.String()) if err != nil || blocksAvailability.totalBytesToUpload() == 0 { // Ignore error from cache and calculate blocks availability - blocksAvailability, err = s.checkBlocksAvailability(ctx, spaceID, fileId) + blocksAvailability, err = s.checkBlocksAvailability(ctx, it.SpaceId, branchToUpload) if err != nil { return fmt.Errorf("check blocks availability: %w", err) } - err = s.blocksAvailabilityCache.Set(context.Background(), fileId.String(), blocksAvailability) + err = s.blocksAvailabilityCache.Set(context.Background(), it.FileId.String(), blocksAvailability) if err != nil { - log.Error("cache blocks availability", zap.String("fileId", fileId.String()), zap.Error(err)) + log.Error("cache blocks availability", zap.String("fileId", it.FileId.String()), zap.Error(err)) } } - stat, err := s.getAndUpdateSpaceStat(ctx, spaceID) + stat, err := s.getAndUpdateSpaceStat(ctx, it.SpaceId) if err != nil { return fmt.Errorf("get space stat: %w", err) } @@ -300,9 +313,9 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain bytesLeft := stat.AccountBytesLimit - stat.TotalBytesUsage if blocksAvailability.totalBytesToUpload() > bytesLeft { // Unbind file just in case - err := s.rpcStore.DeleteFiles(ctx, spaceID, fileId) + err := s.rpcStore.DeleteFiles(ctx, it.SpaceId, it.FileId) if err != nil { - log.Error("calculate limits: unbind off-limit file", zap.String("fileId", fileId.String()), zap.Error(err)) + log.Error("calculate limits: unbind off-limit file", zap.String("fileId", it.FileId.String()), zap.Error(err)) } return &errLimitReached{ fileSize: blocksAvailability.totalBytesToUpload(), @@ -310,15 +323,16 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain totalBytesUsage: stat.TotalBytesUsage, } } - if objectId != "" { - err = s.runOnUploadStartedHook(objectId, domain.FullFileId{FileId: fileId, SpaceId: spaceID}) + if it.ObjectId != "" { + err = s.runOnUploadStartedHook(it.ObjectId, domain.FullFileId{FileId: it.FileId, SpaceId: it.SpaceId}) if isObjectDeletedError(err) { return err } } var totalBytesUploaded int - err = s.walkFileBlocks(ctx, spaceID, fileId, func(fileBlocks []blocks.Block) error { - bytesToUpload, err := s.uploadOrBindBlocks(ctx, spaceID, fileId, fileBlocks, blocksAvailability.cidsToUpload) + + err = s.walkFileBlocks(ctx, it.SpaceId, branchToUpload, func(fileBlocks []blocks.Block) error { + bytesToUpload, err := s.uploadOrBindBlocks(ctx, it.SpaceId, it.FileId, fileBlocks, blocksAvailability.cidsToUpload) if err != nil { return fmt.Errorf("select blocks to upload: %w", err) } @@ -328,9 +342,9 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain if err != nil { if strings.Contains(err.Error(), fileprotoerr.ErrSpaceLimitExceeded.Error()) { // Unbind partially uploaded file - err := s.rpcStore.DeleteFiles(ctx, spaceID, fileId) + err := s.rpcStore.DeleteFiles(ctx, it.SpaceId, it.FileId) if err != nil { - log.Error("upload: unbind off-limit file", zap.String("fileId", fileId.String()), zap.Error(err)) + log.Error("upload: unbind off-limit file", zap.String("fileId", it.FileId.String()), zap.Error(err)) } return &errLimitReached{ fileSize: blocksAvailability.totalBytesToUpload(), @@ -341,13 +355,13 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain return fmt.Errorf("walk file blocks: %w", err) } - err = s.blocksAvailabilityCache.Delete(context.Background(), fileId.String()) + err = s.blocksAvailabilityCache.Delete(context.Background(), it.FileId.String()) if err != nil { - log.Warn("delete blocks availability cache entry", zap.String("fileId", fileId.String()), zap.Error(err)) + log.Warn("delete blocks availability cache entry", zap.String("fileId", it.FileId.String()), zap.Error(err)) } - err = s.isLimitReachedErrorLogged.Delete(context.Background(), fileId.String()) + err = s.isLimitReachedErrorLogged.Delete(context.Background(), it.FileId.String()) if err != nil { - log.Warn("delete limit reached error logged", zap.String("fileId", fileId.String()), zap.Error(err)) + log.Warn("delete limit reached error logged", zap.String("fileId", it.FileId.String()), zap.Error(err)) } return nil diff --git a/core/files/filesync/upload_test.go b/core/files/filesync/upload_test.go index e4c7221562..04ebd60fd7 100644 --- a/core/files/filesync/upload_test.go +++ b/core/files/filesync/upload_test.go @@ -77,7 +77,7 @@ func TestFileSync_AddFile(t *testing.T) { fileId, fileNode := fx.givenFileAddedToDAG(t) spaceId := "space1" - require.NoError(t, fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false)) + require.NoError(t, fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false, "", 0)) fx.waitLimitReachedEvent(t, time.Second*5) fx.waitEmptyQueue(t, fx.uploadingQueue, time.Second*5) @@ -104,7 +104,7 @@ func TestFileSync_AddFile(t *testing.T) { return spacestorage.ErrTreeStorageAlreadyDeleted } - require.NoError(t, fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false)) + require.NoError(t, fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false, "", 0)) fx.waitEmptyQueue(t, fx.uploadingQueue, 100*time.Millisecond) assert.Equal(t, 1, fx.uploadingQueue.NumProcessedItems()) @@ -145,7 +145,7 @@ func (fx *fixture) givenFileAddedToDAG(t *testing.T) (domain.FileId, ipld.Node) func (fx *fixture) givenFileUploaded(t *testing.T, spaceId string, fileId domain.FileId) { // Add file to upload queue - err := fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false) + err := fx.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, true, false, "", 0) require.NoError(t, err) fx.waitEmptyQueue(t, fx.uploadingQueue, time.Second*1) @@ -174,7 +174,7 @@ func TestUpload(t *testing.T) { spaceId := "space1" fileId, _ := fx.givenFileAddedToDAG(t) - err := fx.uploadFile(ctx, spaceId, fileId, "") + err := fx.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) require.NoError(t, err) assert.True(t, fx.rpcStore.Stats().BlocksAdded() > 0) @@ -188,13 +188,13 @@ func TestUpload(t *testing.T) { spaceId := "space1" fileId, _ := fx.givenFileAddedToDAG(t) - err := fx.uploadFile(ctx, spaceId, fileId, "") + err := fx.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) require.NoError(t, err) assert.True(t, fx.rpcStore.Stats().BlocksAdded() > 0) assert.True(t, fx.rpcStore.Stats().CidsBinded() == 0) - err = fx.uploadFile(ctx, spaceId, fileId, "") + err = fx.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) require.NoError(t, err) assert.True(t, fx.rpcStore.Stats().CidsBinded() == fx.rpcStore.Stats().BlocksAdded()) @@ -207,7 +207,7 @@ func TestUpload(t *testing.T) { spaceId := "space1" fileId, _ := fx.givenFileAddedToDAG(t) - err := fx.uploadFile(ctx, spaceId, fileId, "") + err := fx.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) var errLimit *errLimitReached require.ErrorAs(t, err, &errLimit) }) @@ -239,7 +239,7 @@ func TestUpload(t *testing.T) { go func(fileId domain.FileId) { defer wg.Done() - err := fx.uploadFile(ctx, spaceId, fileId, "") + err := fx.uploadFile(ctx, &QueueItem{SpaceId: spaceId, FileId: fileId}) if err != nil { errorsLock.Lock() errors = append(errors, err) diff --git a/core/files/reconciler/reconciler.go b/core/files/reconciler/reconciler.go index 18524282c5..313b91a656 100644 --- a/core/files/reconciler/reconciler.go +++ b/core/files/reconciler/reconciler.go @@ -89,7 +89,7 @@ func (r *reconciler) Init(a *app.App) error { if err != nil { return fmt.Errorf("init rebindQueueStore: %w", err) } - r.rebindQueue = persistentqueue.New(rebindQueueStore, log, r.rebindHandler) + r.rebindQueue = persistentqueue.New(rebindQueueStore, log, r.rebindHandler, nil) r.isStartedStore = keyvaluestore.NewJsonFromCollection[bool](provider.GetSystemCollection()) @@ -165,7 +165,7 @@ func (r *reconciler) rebindHandler(ctx context.Context, item *queueItem) (persis } log.Warn("add to queue", zap.String("objectId", item.ObjectId), zap.String("fileId", item.FileId.FileId.String())) - err = r.fileSync.AddFile(item.ObjectId, item.FileId, false, false) + err = r.fileSync.AddFile(item.ObjectId, item.FileId, false, false, "", 0) if err != nil { return persistentqueue.ActionRetry, fmt.Errorf("upload file: %w", err) } diff --git a/core/files/reconciler/reconciler_test.go b/core/files/reconciler/reconciler_test.go index 2a2b367e52..db542d8317 100644 --- a/core/files/reconciler/reconciler_test.go +++ b/core/files/reconciler/reconciler_test.go @@ -208,7 +208,7 @@ func TestRebindQueue(t *testing.T) { fx := newFixture(t) fx.fileSync.EXPECT().CancelDeletion("objectId1", testFullFileId).Return(nil) - fx.fileSync.EXPECT().AddFile("objectId1", testFullFileId, false, false).Return(nil) + fx.fileSync.EXPECT().AddFile("objectId1", testFullFileId, false, false, domain.FileId(""), 0).Return(nil) err := fx.rebindQueue.Add(&queueItem{ ObjectId: "objectId1", diff --git a/util/persistentqueue/messagequeue.go b/util/persistentqueue/messagequeue.go new file mode 100644 index 0000000000..44aa1c7471 --- /dev/null +++ b/util/persistentqueue/messagequeue.go @@ -0,0 +1,127 @@ +package persistentqueue + +import ( + "context" + "fmt" + "sync" + + "github.com/cheggaaa/mb/v3" +) + +var errClosed = fmt.Errorf("closed") + +type messageQueue[T any] interface { + add(item T) error + initWith(items []T) error + waitOne() (T, error) + close() error +} + +type simpleMessageQueue[T any] struct { + ctx context.Context + batcher *mb.MB[T] +} + +var _ messageQueue[any] = &simpleMessageQueue[any]{} + +func newSimpleMessageQueue[T any](ctx context.Context) *simpleMessageQueue[T] { + return &simpleMessageQueue[T]{ + ctx: ctx, + batcher: mb.New[T](0), + } +} + +func (s *simpleMessageQueue[T]) add(item T) error { + return s.batcher.Add(s.ctx, item) +} + +func (s *simpleMessageQueue[T]) initWith(items []T) error { + for _, it := range items { + err := s.add(it) + if err != nil { + return err + } + } + return nil +} + +func (s *simpleMessageQueue[T]) waitOne() (T, error) { + return s.batcher.WaitOne(s.ctx) +} + +func (s *simpleMessageQueue[T]) close() error { + return s.batcher.Close() +} + +type priorityMessageQueue[T any] struct { + closed bool + cond *sync.Cond + items *priorityQueue[T] +} + +var _ messageQueue[any] = &priorityMessageQueue[any]{} + +func newPriorityMessageQueue[T any](lessFunc func(one, other T) bool) *priorityMessageQueue[T] { + q := &priorityMessageQueue[T]{ + cond: &sync.Cond{ + L: &sync.Mutex{}, + }, + items: newPriorityQueue[T](lessFunc), + } + return q +} + +func (q *priorityMessageQueue[T]) add(item T) error { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if q.closed { + return errClosed + } + q.items.push(item) + q.cond.Signal() + return nil +} + +func (q *priorityMessageQueue[T]) initWith(items []T) error { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.items.initWith(items) + return nil +} + +func (q *priorityMessageQueue[T]) close() error { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if q.closed { + return nil + } + q.closed = true + q.cond.Broadcast() + return nil +} + +func (q *priorityMessageQueue[T]) waitOne() (T, error) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + var defaultVal T + + for { + if q.closed { + return defaultVal, errClosed + } + if q.items.Len() == 0 { + q.cond.Wait() + } else { + break + } + } + it, ok := q.items.pop() + if !ok { + return defaultVal, fmt.Errorf("integrity violation") + } + return it, nil +} diff --git a/util/persistentqueue/messagequeue_test.go b/util/persistentqueue/messagequeue_test.go new file mode 100644 index 0000000000..7985bdfb81 --- /dev/null +++ b/util/persistentqueue/messagequeue_test.go @@ -0,0 +1,122 @@ +package persistentqueue + +import ( + "math/rand" + "sort" + "sync" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMessageQueue(t *testing.T) { + lessFunc := func(one, other int) bool { + return one > other + } + + t.Run("in one goroutine", func(t *testing.T) { + q := newPriorityMessageQueue[int](lessFunc) + + for i := 0; i < 100; i++ { + err := q.add(i) + require.NoError(t, err) + } + + for i := 0; i < 100; i++ { + got, err := q.waitOne() + require.NoError(t, err) + + want := 99 - i + assert.Equal(t, want, got) + } + }) + + t.Run("in multiple goroutines", func(t *testing.T) { + q := newPriorityMessageQueue[int](lessFunc) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := q.add(i) + require.NoError(t, err) + }() + } + + results := make(chan int, 100) + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + got, err := q.waitOne() + require.NoError(t, err) + + results <- got + }() + } + wg.Wait() + + close(results) + + resultsSlice := lo.ChannelToSlice(results) + + want := make([]int, 100) + for i := 0; i < 100; i++ { + want[i] = i + } + + assert.ElementsMatch(t, want, resultsSlice) + }) +} + +type testItemWithPriority struct { + Value int + Timestamp int64 + Priority int +} + +func TestMessageQueueWithComplexPriority(t *testing.T) { + lessFunc := func(one, other testItemWithPriority) bool { + if one.Priority != other.Priority { + return one.Priority > other.Priority + } + return one.Timestamp < other.Timestamp + } + + q := newPriorityMessageQueue[testItemWithPriority](lessFunc) + + const n = 100 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := q.add(testItemWithPriority{ + Value: i, + Timestamp: time.Now().UnixMilli(), + Priority: rand.Intn(10), + }) + require.NoError(t, err) + }() + } + wg.Wait() + + got := make([]testItemWithPriority, 0, n) + for i := 0; i < n; i++ { + it, err := q.waitOne() + require.NoError(t, err) + + got = append(got, it) + } + + gotIsSorted := sort.SliceIsSorted(got, func(i, j int) bool { + one, other := got[i], got[j] + return lessFunc(one, other) + }) + + assert.True(t, gotIsSorted) +} diff --git a/util/persistentqueue/priorityqueue.go b/util/persistentqueue/priorityqueue.go new file mode 100644 index 0000000000..9c083ad945 --- /dev/null +++ b/util/persistentqueue/priorityqueue.go @@ -0,0 +1,55 @@ +package persistentqueue + +import "container/heap" + +type priorityQueue[T any] struct { + items []T + lessFunc func(one, other T) bool +} + +func newPriorityQueue[T any](lessFunc func(one, other T) bool) *priorityQueue[T] { + return &priorityQueue[T]{ + lessFunc: lessFunc, + } +} + +func (q *priorityQueue[T]) push(item T) { + heap.Push(q, item) +} + +func (q *priorityQueue[T]) initWith(items []T) { + q.items = append(q.items, items...) + heap.Init(q) +} + +func (q *priorityQueue[T]) pop() (T, bool) { + if q.Len() == 0 { + var defaultValue T + return defaultValue, false + } + it := heap.Pop(q).(T) + return it, true +} + +func (q *priorityQueue[T]) Len() int { + return len(q.items) +} + +func (q *priorityQueue[T]) Less(i, j int) bool { + return q.lessFunc(q.items[i], q.items[j]) +} + +func (q *priorityQueue[T]) Swap(i, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] +} + +func (q *priorityQueue[T]) Push(x any) { + item := x.(T) + q.items = append(q.items, item) +} + +func (q *priorityQueue[T]) Pop() any { + item := q.items[len(q.items)-1] + q.items = q.items[0 : len(q.items)-1] + return item +} diff --git a/util/persistentqueue/priorityqueue_test.go b/util/persistentqueue/priorityqueue_test.go new file mode 100644 index 0000000000..acf4010559 --- /dev/null +++ b/util/persistentqueue/priorityqueue_test.go @@ -0,0 +1,86 @@ +package persistentqueue + +import ( + "sort" + "testing" + "testing/quick" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +func TestPriorityQueue(t *testing.T) { + lessFunc := func(one, other int) bool { + return one > other + } + t.Run("consecutive insertions", func(t *testing.T) { + pq := newPriorityQueue[int](lessFunc) + + const n = 100 + for i := 0; i < n; i++ { + pq.push(i) + } + + for i := 0; i < n; i++ { + got, ok := pq.pop() + require.True(t, ok) + want := n - 1 - i + require.Equal(t, want, got) + } + }) + + t.Run("property testing", func(t *testing.T) { + f := func(input []int) bool { + want := slices.Clone(input) + // descending order + sort.Slice(want, func(i, j int) bool { + return want[i] > want[j] + }) + pq := newPriorityQueue[int](lessFunc) + for _, in := range input { + pq.push(in) + } + + got := make([]int, 0, len(input)) + for range input { + gotItem, ok := pq.pop() + if !ok { + return false + } + got = append(got, gotItem) + } + + return assert.Equal(t, want, got) + } + + err := quick.Check(f, nil) + require.NoError(t, err) + }) + + t.Run("initWith: property testing", func(t *testing.T) { + f := func(input []int) bool { + want := slices.Clone(input) + // descending order + sort.Slice(want, func(i, j int) bool { + return want[i] > want[j] + }) + pq := newPriorityQueue[int](lessFunc) + pq.initWith(input) + + got := make([]int, 0, len(input)) + for range input { + gotItem, ok := pq.pop() + if !ok { + return false + } + got = append(got, gotItem) + } + + return assert.Equal(t, want, got) + } + + err := quick.Check(f, nil) + require.NoError(t, err) + }) +} diff --git a/util/persistentqueue/queue.go b/util/persistentqueue/queue.go index a59ac9b6d7..1a6162c0ba 100644 --- a/util/persistentqueue/queue.go +++ b/util/persistentqueue/queue.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - "github.com/cheggaaa/mb/v3" "go.uber.org/zap" ) @@ -57,7 +56,7 @@ type Queue[T Item] struct { storage Storage[T] logger *zap.Logger - batcher *mb.MB[T] + batcher messageQueue[T] handler HandlerFunc[T] options options handledItems uint32 @@ -97,16 +96,17 @@ func WithContext(ctx context.Context) Option { } } +// New creates new queue backed by specified storage. If priorityQueueLessFunc is provided, use priority queue as underlying message queue func New[T Item]( storage Storage[T], logger *zap.Logger, handler HandlerFunc[T], + priorityQueueLessFunc func(one, other T) bool, opts ...Option, ) *Queue[T] { q := &Queue[T]{ storage: storage, logger: logger, - batcher: mb.New[T](0), handler: handler, set: make(map[string]struct{}), options: options{}, @@ -120,6 +120,13 @@ func New[T Item]( rootCtx = q.options.ctx } q.ctx, q.ctxCancel = context.WithCancel(rootCtx) + + if priorityQueueLessFunc != nil { + q.batcher = newPriorityMessageQueue[T](priorityQueueLessFunc) + } else { + q.batcher = newSimpleMessageQueue[T](q.ctx) + } + err := q.restore() if err != nil { q.logger.Error("can't restore queue", zap.Error(err)) @@ -165,7 +172,7 @@ func (q *Queue[T]) loop() { } func (q *Queue[T]) handleNext() error { - it, err := q.batcher.WaitOne(q.ctx) + it, err := q.batcher.waitOne() if err != nil { return fmt.Errorf("wait one: %w", err) } @@ -188,7 +195,7 @@ func (q *Queue[T]) handleNext() error { // So just notify waiters that the item has been processed q.notifyWaiters() q.lock.Unlock() - addErr := q.batcher.Add(q.ctx, it) + addErr := q.batcher.add(it) if addErr != nil { return fmt.Errorf("add to queue: %w", addErr) } @@ -212,9 +219,7 @@ func (q *Queue[T]) restore() error { return fmt.Errorf("list items from storage: %w", err) } - sortItems(items) - - err = q.batcher.Add(q.ctx, items...) + err = q.batcher.initWith(items) if err != nil { return fmt.Errorf("add to queue: %w", err) } @@ -224,24 +229,10 @@ func (q *Queue[T]) restore() error { return nil } -func sortItems[T Item](items []T) { - if len(items) == 0 { - return - } - var itemIface Item = items[0] - if _, ok := itemIface.(OrderedItem); ok { - sort.Slice(items, func(i, j int) bool { - var left Item = items[i] - var right Item = items[j] - return left.(OrderedItem).Less(right.(OrderedItem)) - }) - } -} - // Close stops queue processing and waits for the last in-process item to be processed func (q *Queue[T]) Close() error { q.ctxCancel() - err := q.batcher.Close() + err := q.batcher.close() q.lock.Lock() isStarted := q.isStarted q.lock.Unlock() @@ -267,7 +258,7 @@ func (q *Queue[T]) Add(item T) error { q.set[item.Key()] = struct{}{} q.lock.Unlock() - err = q.batcher.Add(q.ctx, item) + err = q.batcher.add(item) if err != nil { return err } diff --git a/util/persistentqueue/queue_test.go b/util/persistentqueue/queue_test.go index e458d5dd9b..ace55bafe4 100644 --- a/util/persistentqueue/queue_test.go +++ b/util/persistentqueue/queue_test.go @@ -60,7 +60,7 @@ func newTestQueueWithDb(t *testing.T, db anystore.DB, handlerFunc HandlerFunc[*t storage, err := NewAnystoreStorage[*testItem](db, "test_queue", makeTestItem) require.NoError(t, err) - q := New[*testItem](storage, log.Desugar(), handlerFunc) + q := New[*testItem](storage, log.Desugar(), handlerFunc, nil) return q } @@ -373,7 +373,7 @@ func TestWithHandlerTickPeriod(t *testing.T) { tickerPeriod := 50 * time.Millisecond q := New[*testItem](storage, log.Desugar(), func(ctx context.Context, item *testItem) (Action, error) { return ActionRetry, nil - }, WithRetryPause(tickerPeriod)) + }, nil, WithRetryPause(tickerPeriod)) err = q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"}) require.NoError(t, err) @@ -399,7 +399,7 @@ func TestWithHandlerTickPeriod(t *testing.T) { tickerPeriod := 50 * time.Millisecond q := New[*testItem](storage, log.Desugar(), func(ctx context.Context, item *testItem) (Action, error) { return ActionDone, nil - }, WithRetryPause(tickerPeriod)) + }, nil, WithRetryPause(tickerPeriod)) err = q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"}) require.NoError(t, err) @@ -435,7 +435,7 @@ func TestWithContext(t *testing.T) { assert.Equal(t, "testValue", val) close(wait) return ActionDone, nil - }, WithContext(testRootCtx)) + }, nil, WithContext(testRootCtx)) q.Run() t.Cleanup(func() { q.Close()