diff --git a/app/api/controller/lfs/upload.go b/app/api/controller/lfs/upload.go index 0e24986b47..067f0f73e6 100644 --- a/app/api/controller/lfs/upload.go +++ b/app/api/controller/lfs/upload.go @@ -15,25 +15,58 @@ package lfs import ( - "bytes" "context" "crypto/sha256" "encoding/hex" "errors" "fmt" + "hash" "io" "strings" "time" "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" + "github.com/harness/gitness/blob" "github.com/harness/gitness/store" "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" + + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +const ( + lfsTempPathFormat = "lfs/tmp/%s" ) type UploadOut struct { - ObjectPath string `json:"object_path"` + ObjectPath string `json:"object_path"` //nolint:tagliatelle +} + +// hashingReader wraps an io.Reader and calculates a hash while reading. +type hashingReader struct { + reader io.Reader + hasher hash.Hash +} + +func newHashingReader(r io.Reader) *hashingReader { + return &hashingReader{ + reader: r, + hasher: sha256.New(), + } +} + +func (h *hashingReader) Read(p []byte) (int, error) { + n, err := h.reader.Read(p) + if n > 0 { + h.hasher.Write(p[:n]) + } + return n, err +} + +func (h *hashingReader) Sum() string { + return hex.EncodeToString(h.hasher.Sum(nil)) } func (c *Controller) Upload(ctx context.Context, @@ -61,28 +94,43 @@ func (c *Controller) Upload(ctx context.Context, return nil, usererror.Conflict("LFS object already exists and cannot be modified") } + expectedHash := strings.TrimPrefix(pointer.OId, "sha256:") + + // Generate a unique temp path for staging the upload + tempPath := fmt.Sprintf(lfsTempPathFormat, uuid.NewString()) + finalPath := getLFSObjectPath(pointer.OId) + + // Stream the content to temp path while calculating hash limitedReader := io.LimitReader(file, pointer.Size) - content, err := io.ReadAll(limitedReader) + hashReader := newHashingReader(limitedReader) + + err = c.blobStore.Upload(ctx, hashReader, tempPath) if err != nil { - return nil, fmt.Errorf("failed to read uploaded content: %w", err) + return nil, fmt.Errorf("failed to upload file to temp path: %w", err) } - hasher := sha256.New() - hasher.Write(content) - calculatedHash := hex.EncodeToString(hasher.Sum(nil)) - - expectedHash := strings.TrimPrefix(pointer.OId, "sha256:") - + calculatedHash := hashReader.Sum() if calculatedHash != expectedHash { + if deleteErr := c.blobStore.Delete(ctx, tempPath); deleteErr != nil { + if !errors.Is(deleteErr, blob.ErrNotFound) { + log.Ctx(ctx).Warn().Err(deleteErr). + Str("temp_path", tempPath). + Msg("failed to delete temp file after hash mismatch") + } + } return nil, usererror.BadRequest("content hash doesn't match provided OID") } - contentReader := bytes.NewReader(content) - objPath := getLFSObjectPath(pointer.OId) - - err = c.blobStore.Upload(ctx, contentReader, objPath) + err = c.blobStore.Move(ctx, tempPath, finalPath) if err != nil { - return nil, fmt.Errorf("failed to upload file: %w", err) + if deleteErr := c.blobStore.Delete(ctx, tempPath); deleteErr != nil { + if !errors.Is(deleteErr, blob.ErrNotFound) { + log.Ctx(ctx).Warn().Err(deleteErr). + Str("temp_path", tempPath). + Msg("failed to delete temp file after move failure") + } + } + return nil, fmt.Errorf("failed to move file to final path: %w", err) } now := time.Now() @@ -101,6 +149,6 @@ func (c *Controller) Upload(ctx context.Context, } return &UploadOut{ - ObjectPath: objPath, + ObjectPath: finalPath, }, nil } diff --git a/blob/filesystem.go b/blob/filesystem.go index 7d17955eec..c37a19d1e6 100644 --- a/blob/filesystem.go +++ b/blob/filesystem.go @@ -102,3 +102,33 @@ func (c *FileSystemStore) Download(_ context.Context, filePath string) (io.ReadC } return io.ReadCloser(file), nil } + +func (c *FileSystemStore) Move(_ context.Context, srcPath, dstPath string) error { + srcDiskPath := fmt.Sprintf(fileDiskPathFmt, c.basePath, srcPath) + dstDiskPath := fmt.Sprintf(fileDiskPathFmt, c.basePath, dstPath) + + // Ensure destination directory exists + dstDir, _ := path.Split(dstDiskPath) + if _, err := os.Stat(dstDir); errors.Is(err, fs.ErrNotExist) { + if err = os.MkdirAll(dstDir, os.ModeDir|os.ModePerm); err != nil { + return fmt.Errorf("failed to create destination directory: %w", err) + } + } + + if err := os.Rename(srcDiskPath, dstDiskPath); err != nil { + return fmt.Errorf("failed to move file: %w", err) + } + return nil +} + +func (c *FileSystemStore) Delete(_ context.Context, filePath string) error { + fileDiskPath := fmt.Sprintf(fileDiskPathFmt, c.basePath, filePath) + + if err := os.Remove(fileDiskPath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return ErrNotFound + } + return fmt.Errorf("failed to delete file: %w", err) + } + return nil +} diff --git a/blob/filesystem_test.go b/blob/filesystem_test.go index 6ca6fde923..d0c2368483 100644 --- a/blob/filesystem_test.go +++ b/blob/filesystem_test.go @@ -371,3 +371,164 @@ func TestFileSystemStore_Interface(t *testing.T) { _ = store } + +func TestFileSystemStore_Move(t *testing.T) { + tempDir, err := os.MkdirTemp("", "blob-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + store := &FileSystemStore{basePath: tempDir} + ctx := context.Background() + + tests := []struct { + name string + srcPath string + dstPath string + content string + expectError bool + errorType error + }{ + { + name: "move file same directory", + srcPath: "src.txt", + dstPath: "dst.txt", + content: "test content", + expectError: false, + }, + { + name: "move file to nested directory", + srcPath: "src2.txt", + dstPath: "nested/dir/dst2.txt", + content: "nested content", + expectError: false, + }, + { + name: "move non-existent file", + srcPath: "nonexistent.txt", + dstPath: "dst.txt", + content: "", + expectError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Setup: create source file if content is provided + if test.content != "" { + srcFullPath := filepath.Join(tempDir, test.srcPath) + if err := os.WriteFile(srcFullPath, []byte(test.content), 0600); err != nil { + t.Fatalf("failed to create source file: %v", err) + } + } + + err := store.Move(ctx, test.srcPath, test.dstPath) + + if test.expectError { + if err == nil { + t.Error("expected error but got none") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify source file no longer exists + srcFullPath := filepath.Join(tempDir, test.srcPath) + if _, err := os.Stat(srcFullPath); !os.IsNotExist(err) { + t.Error("source file should not exist after move") + } + + // Verify destination file exists with correct content + dstFullPath := filepath.Join(tempDir, test.dstPath) + data, err := os.ReadFile(dstFullPath) + if err != nil { + t.Fatalf("failed to read destination file: %v", err) + } + + if string(data) != test.content { + t.Errorf("expected content %q, got %q", test.content, string(data)) + } + }) + } +} + +func TestFileSystemStore_Delete(t *testing.T) { + tempDir, err := os.MkdirTemp("", "blob-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + store := &FileSystemStore{basePath: tempDir} + ctx := context.Background() + + tests := []struct { + name string + filePath string + content string + expectError bool + errorType error + }{ + { + name: "delete existing file", + filePath: "to-delete.txt", + content: "delete me", + expectError: false, + }, + { + name: "delete nested file", + filePath: "nested/to-delete.txt", + content: "delete nested", + expectError: false, + }, + { + name: "delete non-existent file", + filePath: "nonexistent.txt", + content: "", + expectError: true, + errorType: ErrNotFound, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Setup: create file if content is provided + if test.content != "" { + fullPath := filepath.Join(tempDir, test.filePath) + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatalf("failed to create directory: %v", err) + } + if err := os.WriteFile(fullPath, []byte(test.content), 0600); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + } + + err := store.Delete(ctx, test.filePath) + + if test.expectError { + if err == nil { + t.Error("expected error but got none") + } + if test.errorType != nil && !errors.Is(err, test.errorType) { + t.Errorf("expected error type %v, got %v", test.errorType, err) + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify file no longer exists + fullPath := filepath.Join(tempDir, test.filePath) + if _, err := os.Stat(fullPath); !os.IsNotExist(err) { + t.Error("file should not exist after delete") + } + }) + } +} diff --git a/blob/gcs.go b/blob/gcs.go index 12a86f6600..dbb2d1ddfb 100644 --- a/blob/gcs.go +++ b/blob/gcs.go @@ -163,6 +163,47 @@ func (c *GCSStore) Download(ctx context.Context, filePath string) (io.ReadCloser return rc, nil } +func (c *GCSStore) Move(ctx context.Context, srcPath, dstPath string) error { + gcsClient, err := c.getClient(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve latest client: %w", err) + } + + bkt := gcsClient.Bucket(c.config.Bucket) + srcObj := bkt.Object(srcPath) + dstObj := bkt.Object(dstPath) + + if _, err := dstObj.CopierFrom(srcObj).Run(ctx); err != nil { + if errors.Is(err, storage.ErrObjectNotExist) { + return ErrNotFound + } + return fmt.Errorf("failed to copy file from %q to %q: %w", srcPath, dstPath, err) + } + + if err := srcObj.Delete(ctx); err != nil { + log.Ctx(ctx).Warn().Err(err).Msgf( + "failed to delete source file %q after successful copy to %q", srcPath, dstPath) + } + + return nil +} + +func (c *GCSStore) Delete(ctx context.Context, filePath string) error { + gcsClient, err := c.getClient(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve latest client: %w", err) + } + + bkt := gcsClient.Bucket(c.config.Bucket) + if err := bkt.Object(filePath).Delete(ctx); err != nil { + if errors.Is(err, storage.ErrObjectNotExist) { + return ErrNotFound + } + return fmt.Errorf("failed to delete file %q: %w", filePath, err) + } + return nil +} + func createNewImpersonatedClient(ctx context.Context, cfg Config) (*storage.Client, error) { // Use workload identity impersonation default credentials (GKE environment) ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{ diff --git a/blob/interface.go b/blob/interface.go index db9d8d62cd..d0e4e8c58a 100644 --- a/blob/interface.go +++ b/blob/interface.go @@ -35,4 +35,10 @@ type Store interface { // Download returns a reader for a file in the blob store. Download(ctx context.Context, filePath string) (io.ReadCloser, error) + + // Move moves a file from srcPath to dstPath within the blob store. + Move(ctx context.Context, srcPath, dstPath string) error + + // Delete removes a file from the blob store. + Delete(ctx context.Context, filePath string) error }