support Uploader.MaxConcurrency=1 and updated tests

This commit is contained in:
Gani Georgiev 2025-10-02 20:52:36 +03:00
parent 44289a93a2
commit 77c05dbd2a
3 changed files with 124 additions and 112 deletions

View File

@ -3,6 +3,8 @@
- ⚠️ Excluded the `lost+found` directory from the backups ([#7208](https://github.com/pocketbase/pocketbase/pull/7208); thanks @lbndev). - ⚠️ Excluded the `lost+found` directory from the backups ([#7208](https://github.com/pocketbase/pocketbase/pull/7208); thanks @lbndev).
_If for some reason you want to keep it, you can restore it by editing the `e.Exclude` list of the `OnBackupCreate` and `OnBackupRestore` hooks._ _If for some reason you want to keep it, you can restore it by editing the `e.Exclude` list of the `OnBackupCreate` and `OnBackupRestore` hooks._
- Minor tests improvements (disabled initial superuser creation for the test app to avoid cluttering the std output, added more tests for the s3.Uploader.MaxConcurrency, etc.).
- Updated Go dependencies. - Updated Go dependencies.

View File

@ -334,10 +334,10 @@ func (u *Uploader) multipartUpload(ctx context.Context, initPart []byte, optReqF
var g errgroup.Group var g errgroup.Group
g.SetLimit(u.MaxConcurrency) g.SetLimit(u.MaxConcurrency)
totalParallel := u.MaxConcurrency totalWorkers := u.MaxConcurrency
if len(initPart) != 0 { if len(initPart) != 0 {
totalParallel-- totalWorkers--
initPartNumber := u.lastPartNumber initPartNumber := u.lastPartNumber
g.Go(func() error { g.Go(func() error {
mp, err := u.uploadPart(ctx, initPartNumber, initPart, optReqFuncs...) mp, err := u.uploadPart(ctx, initPartNumber, initPart, optReqFuncs...)
@ -353,7 +353,9 @@ func (u *Uploader) multipartUpload(ctx context.Context, initPart []byte, optReqF
}) })
} }
for i := 0; i < totalParallel; i++ { totalWorkers = max(totalWorkers, 1)
for i := 0; i < totalWorkers; i++ {
g.Go(func() error { g.Go(func() error {
for { for {
part, num, err := u.nextPart() part, num, err := u.nextPart()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"io" "io"
"net/http" "net/http"
"strconv"
"strings" "strings"
"testing" "testing"
@ -124,20 +125,24 @@ func TestUploaderSingleUpload(t *testing.T) {
func TestUploaderMultipartUploadSuccess(t *testing.T) { func TestUploaderMultipartUploadSuccess(t *testing.T) {
t.Parallel() t.Parallel()
httpClient := tests.NewClient( maxConcurrencies := []int{-1, 0, 1, 10}
&tests.RequestStub{
Method: http.MethodPost, for _, mc := range maxConcurrencies {
URL: "http://test_bucket.example.com/test_key?uploads", t.Run("MaxConcurrency_"+strconv.Itoa(mc), func(t *testing.T) {
Match: func(req *http.Request) bool { httpClient := tests.NewClient(
return tests.ExpectHeaders(req.Header, map[string]string{ &tests.RequestStub{
"x-amz-meta-a": "123", Method: http.MethodPost,
"x-amz-meta-b": "456", URL: "http://test_bucket.example.com/test_key?uploads",
"test_header": "test", Match: func(req *http.Request) bool {
"Authorization": "^.+Credential=123/.+$", return tests.ExpectHeaders(req.Header, map[string]string{
}) "x-amz-meta-a": "123",
}, "x-amz-meta-b": "456",
Response: &http.Response{ "test_header": "test",
Body: io.NopCloser(strings.NewReader(` "Authorization": "^.+Credential=123/.+$",
})
},
Response: &http.Response{
Body: io.NopCloser(strings.NewReader(`
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult> <InitiateMultipartUploadResult>
<Bucket>test_bucket</Bucket> <Bucket>test_bucket</Bucket>
@ -145,108 +150,111 @@ func TestUploaderMultipartUploadSuccess(t *testing.T) {
<UploadId>test_id</UploadId> <UploadId>test_id</UploadId>
</InitiateMultipartUploadResult> </InitiateMultipartUploadResult>
`)), `)),
}, },
}, },
&tests.RequestStub{ &tests.RequestStub{
Method: http.MethodPut, Method: http.MethodPut,
URL: "http://test_bucket.example.com/test_key?partNumber=1&uploadId=test_id", URL: "http://test_bucket.example.com/test_key?partNumber=1&uploadId=test_id",
Match: func(req *http.Request) bool { Match: func(req *http.Request) bool {
body, err := io.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return false return false
} }
return string(body) == "abc" && tests.ExpectHeaders(req.Header, map[string]string{ return string(body) == "abc" && tests.ExpectHeaders(req.Header, map[string]string{
"Content-Length": "3", "Content-Length": "3",
"test_header": "test", "test_header": "test",
"Authorization": "^.+Credential=123/.+$", "Authorization": "^.+Credential=123/.+$",
}) })
}, },
Response: &http.Response{ Response: &http.Response{
Header: http.Header{"Etag": []string{"etag1"}}, Header: http.Header{"Etag": []string{"etag1"}},
}, },
}, },
&tests.RequestStub{ &tests.RequestStub{
Method: http.MethodPut, Method: http.MethodPut,
URL: "http://test_bucket.example.com/test_key?partNumber=2&uploadId=test_id", URL: "http://test_bucket.example.com/test_key?partNumber=2&uploadId=test_id",
Match: func(req *http.Request) bool { Match: func(req *http.Request) bool {
body, err := io.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return false return false
} }
return string(body) == "def" && tests.ExpectHeaders(req.Header, map[string]string{ return string(body) == "def" && tests.ExpectHeaders(req.Header, map[string]string{
"Content-Length": "3", "Content-Length": "3",
"test_header": "test", "test_header": "test",
"Authorization": "^.+Credential=123/.+$", "Authorization": "^.+Credential=123/.+$",
}) })
}, },
Response: &http.Response{ Response: &http.Response{
Header: http.Header{"Etag": []string{"etag2"}}, Header: http.Header{"Etag": []string{"etag2"}},
}, },
}, },
&tests.RequestStub{ &tests.RequestStub{
Method: http.MethodPut, Method: http.MethodPut,
URL: "http://test_bucket.example.com/test_key?partNumber=3&uploadId=test_id", URL: "http://test_bucket.example.com/test_key?partNumber=3&uploadId=test_id",
Match: func(req *http.Request) bool { Match: func(req *http.Request) bool {
body, err := io.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return false return false
} }
return string(body) == "g" && tests.ExpectHeaders(req.Header, map[string]string{ return string(body) == "g" && tests.ExpectHeaders(req.Header, map[string]string{
"Content-Length": "1", "Content-Length": "1",
"test_header": "test", "test_header": "test",
"Authorization": "^.+Credential=123/.+$", "Authorization": "^.+Credential=123/.+$",
}) })
}, },
Response: &http.Response{ Response: &http.Response{
Header: http.Header{"Etag": []string{"etag3"}}, Header: http.Header{"Etag": []string{"etag3"}},
}, },
}, },
&tests.RequestStub{ &tests.RequestStub{
Method: http.MethodPost, Method: http.MethodPost,
URL: "http://test_bucket.example.com/test_key?uploadId=test_id", URL: "http://test_bucket.example.com/test_key?uploadId=test_id",
Match: func(req *http.Request) bool { Match: func(req *http.Request) bool {
body, err := io.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return false return false
} }
expected := `<CompleteMultipartUpload><Part><ETag>etag1</ETag><PartNumber>1</PartNumber></Part><Part><ETag>etag2</ETag><PartNumber>2</PartNumber></Part><Part><ETag>etag3</ETag><PartNumber>3</PartNumber></Part></CompleteMultipartUpload>` expected := `<CompleteMultipartUpload><Part><ETag>etag1</ETag><PartNumber>1</PartNumber></Part><Part><ETag>etag2</ETag><PartNumber>2</PartNumber></Part><Part><ETag>etag3</ETag><PartNumber>3</PartNumber></Part></CompleteMultipartUpload>`
return strings.Contains(string(body), expected) && tests.ExpectHeaders(req.Header, map[string]string{ return strings.Contains(string(body), expected) && tests.ExpectHeaders(req.Header, map[string]string{
"test_header": "test", "test_header": "test",
"Authorization": "^.+Credential=123/.+$", "Authorization": "^.+Credential=123/.+$",
}) })
}, },
}, },
) )
uploader := &s3.Uploader{ uploader := &s3.Uploader{
S3: &s3.S3{ S3: &s3.S3{
Client: httpClient, Client: httpClient,
Region: "test_region", Region: "test_region",
Bucket: "test_bucket", Bucket: "test_bucket",
Endpoint: "http://example.com", Endpoint: "http://example.com",
AccessKey: "123", AccessKey: "123",
SecretKey: "abc", SecretKey: "abc",
}, },
Key: "test_key", Key: "test_key",
Payload: strings.NewReader("abcdefg"), Payload: strings.NewReader("abcdefg"),
Metadata: map[string]string{"a": "123", "b": "456"}, Metadata: map[string]string{"a": "123", "b": "456"},
MinPartSize: 3, MinPartSize: 3,
} MaxConcurrency: mc,
}
err := uploader.Upload(context.Background(), func(r *http.Request) { err := uploader.Upload(context.Background(), func(r *http.Request) {
r.Header.Set("test_header", "test") r.Header.Set("test_header", "test")
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = httpClient.AssertNoRemaining() err = httpClient.AssertNoRemaining()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
}
})
} }
} }