Skip to content

Commit 63eda99

Browse files
committed
rework locking when uploading
Signed-off-by: jkoberg <[email protected]>
1 parent 1ff975d commit 63eda99

File tree

4 files changed

+69
-60
lines changed

4 files changed

+69
-60
lines changed

pkg/storage/utils/decomposedfs/decomposedfs.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
300300
); err != nil {
301301
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
302302
}
303+
304+
/* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED
303305
case events.VirusscanFinished:
304306
if ev.ErrorMsg != "" {
305307
// scan failed somehow
@@ -385,7 +387,6 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
385387
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
386388
continue
387389
}
388-
389390
default:
390391
// uploadid is not empty -> this is an async upload
391392
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
@@ -410,7 +411,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
410411
411412
// remove cache entry in gateway
412413
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
413-
414+
*/
414415
default:
415416
log.Error().Interface("event", ev).Msg("Unknown event")
416417
}

pkg/storage/utils/decomposedfs/upload/processing.go

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ package upload
2121
import (
2222
"context"
2323
"encoding/json"
24+
"fmt"
2425
iofs "io/fs"
2526
"os"
2627
"path/filepath"
2728
"strconv"
29+
"strings"
2830
"time"
2931

3032
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@@ -42,6 +44,7 @@ import (
4244
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
4345
"github.com/cs3org/reva/v2/pkg/storagespace"
4446
"github.com/cs3org/reva/v2/pkg/utils"
47+
"github.com/gofrs/flock"
4548
"github.com/google/uuid"
4649
"github.com/pkg/errors"
4750
tusd "github.com/tus/tusd/pkg/handler"
@@ -234,7 +237,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri
234237
}
235238

236239
// CreateNodeForUpload will create the target node for the Upload
237-
func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
240+
func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) {
238241
fi, err := os.Stat(upload.binPath)
239242
if err != nil {
240243
return nil, err
@@ -262,19 +265,29 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
262265
return nil, err
263266
}
264267

268+
var lock *flock.Flock
269+
defer filelocks.ReleaseLock(lock)
270+
265271
switch n.ID {
266272
case "":
267-
err = initNewNode(upload, n, uint64(fsize))
273+
lock, err = initNewNode(upload, n, uint64(fsize))
268274
default:
269-
err = updateExistingNode(upload, n, spaceID, uint64(fsize))
275+
lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize))
270276
}
271-
272277
if err != nil {
273278
return nil, err
274279
}
275280

276-
// create/update node info
277-
if err := n.WriteAllNodeMetadata(); err != nil {
281+
// overwrite technical information
282+
initAttrs[xattrs.ParentidAttr] = n.ParentID
283+
initAttrs[xattrs.NameAttr] = n.Name
284+
initAttrs[xattrs.BlobIDAttr] = n.BlobID
285+
initAttrs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10)
286+
initAttrs[xattrs.StatusPrefix] = node.ProcessingStatus
287+
288+
// update node metadata with new blobid etc
289+
err = n.SetXattrsWithLock(initAttrs, lock)
290+
if err != nil {
278291
return nil, errors.Wrap(err, "Decomposedfs: could not write metadata")
279292
}
280293

@@ -284,56 +297,61 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
284297
return nil, err
285298
}
286299

287-
return n, n.MarkProcessing()
300+
return n, nil
288301
}
289302

290-
func initNewNode(upload *Upload, n *node.Node, fsize uint64) error {
303+
func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) {
291304
n.ID = uuid.New().String()
292305

293306
// create folder structure (if needed)
294307
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
295-
return err
308+
return nil, err
296309
}
297310

298311
if _, err := os.Create(n.InternalPath()); err != nil {
299-
return err
312+
return nil, err
313+
}
314+
315+
lock, err := filelocks.AcquireWriteLock(n.InternalPath())
316+
if err != nil {
317+
// we cannot acquire a lock - we error for safety
318+
return lock, err
300319
}
301320

302321
if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil {
303-
return err
322+
return lock, err
304323
}
305324

306325
// link child name to parent if it is new
307326
childNameLink := filepath.Join(n.ParentInternalPath(), n.Name)
308-
var link string
309327
link, err := os.Readlink(childNameLink)
310328
if err == nil && link != "../"+n.ID {
311329
if err := os.Remove(childNameLink); err != nil {
312-
return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry")
330+
return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry")
313331
}
314332
}
315333
if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID {
316334
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
317335
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
318-
return errors.Wrap(err, "Decomposedfs: could not symlink child entry")
336+
return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
319337
}
320338
}
321339

322340
// on a new file the sizeDiff is the fileSize
323341
upload.sizeDiff = int64(fsize)
324342
upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff))
325-
return nil
343+
return lock, nil
326344
}
327345

328-
func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) error {
346+
func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) {
329347
old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false)
330348
if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil {
331-
return err
349+
return nil, err
332350
}
333351

334352
vfi, err := os.Stat(old.InternalPath())
335353
if err != nil {
336-
return err
354+
return nil, err
337355
}
338356

339357
// When the if-match header was set we need to check if the
@@ -342,9 +360,9 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint
342360
targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime())
343361
switch {
344362
case err != nil:
345-
return errtypes.InternalError(err.Error())
363+
return nil, errtypes.InternalError(err.Error())
346364
case ifMatch != targetEtag:
347-
return errtypes.Aborted("etag mismatch")
365+
return nil, errtypes.Aborted("etag mismatch")
348366
}
349367
}
350368

@@ -358,36 +376,29 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint
358376
lock, err := filelocks.AcquireWriteLock(targetPath)
359377
if err != nil {
360378
// we cannot acquire a lock - we error for safety
361-
return err
379+
return nil, err
362380
}
363-
defer filelocks.ReleaseLock(lock)
364381

365-
// This move drops all metadata!!! We copy it below with CopyMetadata
366-
if err = os.Rename(targetPath, upload.versionsPath); err != nil {
367-
return err
382+
// create version node
383+
if _, err := os.Create(upload.versionsPath); err != nil {
384+
return lock, err
368385
}
369386

370-
if _, err := os.Create(targetPath); err != nil {
371-
return err
387+
// copy blob metadata to version node
388+
if err := xattrs.CopyMetadataWithSourceLock(targetPath, upload.versionsPath, func(attributeName string) bool {
389+
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) ||
390+
attributeName == xattrs.BlobIDAttr ||
391+
attributeName == xattrs.BlobsizeAttr
392+
}, lock); err != nil {
393+
return lock, err
372394
}
373395

374-
// copy grant and arbitrary metadata
375-
// NOTE: now restoring an older revision might bring back a grant that was removed!
376-
if err := xattrs.CopyMetadata(upload.versionsPath, targetPath, func(attributeName string) bool {
377-
return true
378-
// TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties
379-
/*
380-
[>
381-
return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants
382-
strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata
383-
strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites
384-
strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file
385-
*/
386-
}); err != nil {
387-
return err
396+
// keep mtime from previous version
397+
if err := os.Chtimes(upload.versionsPath, vfi.ModTime(), vfi.ModTime()); err != nil {
398+
return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
388399
}
389400

390-
return nil
401+
return lock, nil
391402
}
392403

393404
// lookupNode looks up nodes by path.

pkg/storage/utils/decomposedfs/upload/upload.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
4343
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
4444
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
45+
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
4546
"github.com/cs3org/reva/v2/pkg/utils"
4647
"github.com/golang-jwt/jwt"
4748
"github.com/pkg/errors"
@@ -227,19 +228,21 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
227228
}
228229
}
229230

230-
n, err := CreateNodeForUpload(upload)
231+
// update checksums
232+
attrs := map[string]string{
233+
xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)),
234+
xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)),
235+
xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)),
236+
}
237+
238+
n, err := CreateNodeForUpload(upload, attrs)
231239
if err != nil {
232240
Cleanup(upload, true, false)
233241
return err
234242
}
235243

236244
upload.Node = n
237245

238-
// now try write all checksums
239-
tryWritingChecksum(log, upload.Node, "sha1", sha1h)
240-
tryWritingChecksum(log, upload.Node, "md5", md5h)
241-
tryWritingChecksum(log, upload.Node, "adler32", adler32h)
242-
243246
if upload.pub != nil {
244247
u, _ := ctxpkg.ContextGetUser(upload.Ctx)
245248
s, err := upload.URL(upload.Ctx)
@@ -459,13 +462,3 @@ func joinurl(paths ...string) string {
459462

460463
return s.String()
461464
}
462-
463-
func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) {
464-
if err := n.SetChecksum(algo, h); err != nil {
465-
log.Err(err).
466-
Str("csType", algo).
467-
Bytes("hash", h.Sum(nil)).
468-
Msg("Decomposedfs: could not write checksum")
469-
// this is not critical, the bytes are there so we will continue
470-
}
471-
}

pkg/storage/utils/filelocks/filelocks.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ func AcquireWriteLock(file string) (*flock.Flock, error) {
164164
// ReleaseLock releases a lock from a file that was previously created
165165
// by AcquireReadLock or AcquireWriteLock.
166166
func ReleaseLock(lock *flock.Flock) error {
167+
if lock == nil {
168+
return errors.New("cannot unlock nil lock")
169+
}
170+
167171
// there is a probability that if the file can not be unlocked,
168172
// we also can not remove the file. We will only try to remove if it
169173
// was successfully unlocked.

0 commit comments

Comments
 (0)