Skip to content

GO-5419 upload smaller images first #2317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: go-5025-remove-badger-totally
Choose a base branch
from
2 changes: 1 addition & 1 deletion core/files/fileobject/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *service) migrateDeriveObject(ctx context.Context, space clientspace.Spa
err = nil
}

err = s.addToSyncQueue(id, fullFileId, false, false)
err = s.addToSyncQueue(id, fullFileId, false, false, "", 0)
if err != nil {
return fmt.Errorf("add to sync queue: %w", err)
}
Expand Down
39 changes: 34 additions & 5 deletions core/files/fileobject/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -34,6 +35,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/datastore/anystoreprovider"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/mill"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/space/clientspace"
Expand Down Expand Up @@ -146,6 +148,7 @@ func (s *service) Init(a *app.App) error {
migrationQueueStore,
log.Desugar(),
s.migrationQueueHandler,
nil,
persistentqueue.WithContext(migrationQueueCtx),
)
return nil
Expand Down Expand Up @@ -233,7 +236,7 @@ func (s *service) ensureNotSyncedFilesAddedToQueue() error {
fullId := extractFullFileIdFromDetails(record.Details)
if record.Details.GetString(bundle.RelationKeyCreator) == s.accountService.MyParticipantId(fullId.SpaceId) {
id := record.Details.GetString(bundle.RelationKeyId)
err := s.addToSyncQueue(id, fullId, false, false)
err := s.addToSyncQueue(id, fullId, false, false, "", 0)
if err != nil {
log.Errorf("add to sync queue: %v", err)
}
Expand All @@ -260,7 +263,7 @@ func (s *service) EnsureFileAddedToSyncQueue(id domain.FullID, details *domain.D
SpaceId: id.SpaceID,
FileId: domain.FileId(details.GetString(bundle.RelationKeyFileId)),
}
err := s.addToSyncQueue(id.ObjectID, fullId, false, false)
err := s.addToSyncQueue(id.ObjectID, fullId, false, false, "", 0)
return err
}

Expand All @@ -285,6 +288,11 @@ func (s *service) InitEmptyFileState(st *state.State) {
fileblocks.InitEmptyFileState(st)
}

type imageVariant struct {
variantId domain.FileId
size int64
}

func (s *service) Create(ctx context.Context, spaceId string, req filemodels.CreateRequest) (id string, object *domain.Details, err error) {
if !req.AsyncMetadataIndexing && len(req.FileVariants) == 0 {
return "", nil, fmt.Errorf("file variants are not provided")
Expand All @@ -299,7 +307,28 @@ func (s *service) Create(ctx context.Context, spaceId string, req filemodels.Cre
if err != nil {
return "", nil, fmt.Errorf("create in space: %w", err)
}
err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported())

var imageVariants []imageVariant
for _, variant := range req.FileVariants {
if variant.Mill == mill.ImageResizeId {
imageVariants = append(imageVariants, imageVariant{
variantId: domain.FileId(variant.Hash),
size: variant.Size_,
})
}
}
sort.Slice(imageVariants, func(i, j int) bool {
return imageVariants[i].size < imageVariants[j].size
})
for idx, variant := range imageVariants {
priority := len(imageVariants) - idx
err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported(), variant.variantId, priority)
if err != nil {
return "", nil, fmt.Errorf("add image variant to sync queue: %w", err)
}
}

err = s.addToSyncQueue(id, domain.FullFileId{SpaceId: space.Id(), FileId: req.FileId}, true, req.ObjectOrigin.IsImported(), "", 0)
if err != nil {
return "", nil, fmt.Errorf("add to sync queue: %w", err)
}
Expand Down Expand Up @@ -423,8 +452,8 @@ func (s *service) CreateFromImport(fileId domain.FullFileId, origin objectorigin
return fileObjectId, nil
}

func (s *service) addToSyncQueue(objectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool) error {
if err := s.fileSync.AddFile(objectId, fileId, uploadedByUser, imported); err != nil {
func (s *service) addToSyncQueue(objectId string, fileId domain.FullFileId, uploadedByUser bool, imported bool, prioritizeVariantId domain.FileId, priority int) error {
if err := s.fileSync.AddFile(objectId, fileId, uploadedByUser, imported, prioritizeVariantId, priority); err != nil {
return fmt.Errorf("add file to sync queue: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/files/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestFileAdd(t *testing.T) {
})

t.Run("check that file is uploaded to backup node", func(t *testing.T) {
err := fx.fileSyncService.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: got.FileId}, true, false)
err := fx.fileSyncService.AddFile("objectId1", domain.FullFileId{SpaceId: spaceId, FileId: got.FileId}, true, false, "", 0)
require.NoError(t, err)
<-uploaded
infos, err := fx.rpcStore.FilesInfo(ctx, spaceId, got.FileId)
Expand Down
10 changes: 5 additions & 5 deletions core/files/filesync/filesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type LimitCallback func(fileObjectId string, fileId domain.FullFileId, bytesLeft
type DeleteCallback func(fileObjectId domain.FullFileId)

type FileSync interface {
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error)
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool, prioritizeVariantId domain.FileId, score int) (err error)
UploadSynchronously(ctx context.Context, spaceId string, fileId domain.FileId) error
OnUploadStarted(StatusCallback)
OnUploaded(StatusCallback)
Expand Down Expand Up @@ -119,25 +119,25 @@ func (s *fileSync) Init(a *app.App) (err error) {
if err != nil {
return fmt.Errorf("init uploading queue storage: %w", err)
}
s.uploadingQueue = persistentqueue.New(uploadingQueueStorage, log.Logger, s.uploadingHandler)
s.uploadingQueue = persistentqueue.New(uploadingQueueStorage, log.Logger, s.uploadingHandler, queueItemLess)

retryUploadingQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/retry_uploading", makeQueueItem)
if err != nil {
return fmt.Errorf("init retry uploading queue storage: %w", err)
}
s.retryUploadingQueue = persistentqueue.New(retryUploadingQueueStorage, log.Logger, s.retryingHandler, persistentqueue.WithRetryPause(loopTimeout))
s.retryUploadingQueue = persistentqueue.New(retryUploadingQueueStorage, log.Logger, s.retryingHandler, queueItemLess, persistentqueue.WithRetryPause(loopTimeout))

deletionQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/deletion", makeDeletionQueueItem)
if err != nil {
return fmt.Errorf("init deletion queue storage: %w", err)
}
s.deletionQueue = persistentqueue.New(deletionQueueStorage, log.Logger, s.deletionHandler)
s.deletionQueue = persistentqueue.New(deletionQueueStorage, log.Logger, s.deletionHandler, nil)

retryDeletionQueueStorage, err := persistentqueue.NewAnystoreStorage(db, "filesync/retry_deletion", makeDeletionQueueItem)
if err != nil {
return fmt.Errorf("init retry deletion queue storage: %w", err)
}
s.retryDeletionQueue = persistentqueue.New(retryDeletionQueueStorage, log.Logger, s.retryDeletionHandler, persistentqueue.WithRetryPause(loopTimeout))
s.retryDeletionQueue = persistentqueue.New(retryDeletionQueueStorage, log.Logger, s.retryDeletionHandler, nil, persistentqueue.WithRetryPause(loopTimeout))

return
}
Expand Down
22 changes: 12 additions & 10 deletions core/files/filesync/mock_filesync/mock_FileSync.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions core/files/filesync/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type QueueItem struct {
Timestamp int64
AddedByUser bool
Imported bool

// VariantId tells uploader to upload specific branch of file tree
VariantId domain.FileId
// Score affects priority
Score int
}

func (it *QueueItem) Validate() error {
Expand All @@ -33,6 +38,9 @@ func (it *QueueItem) Validate() error {
}

func (it *QueueItem) Key() string {
if it.VariantId != "" {
return it.ObjectId + "/" + it.VariantId.String()
}
return it.ObjectId
}

Expand All @@ -43,6 +51,9 @@ func (it *QueueItem) FullFileId() domain.FullFileId {
}
}

func (it *QueueItem) Less(other *QueueItem) bool {
return it.Timestamp < other.Timestamp
func queueItemLess(one, other *QueueItem) bool {
if one.Score != other.Score {
return one.Score > other.Score
}
return one.Timestamp < other.Timestamp
}
Loading
Loading