updated dao fail/retry handling
This commit is contained in:
+23
-10
@@ -5,6 +5,7 @@ package daos
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase/models"
|
||||
@@ -20,8 +21,10 @@ func New(db dbx.Builder) *Dao {
|
||||
// async and sync db builders.
|
||||
func NewMultiDB(concurrentDB, nonconcurrentDB dbx.Builder) *Dao {
|
||||
return &Dao{
|
||||
concurrentDB: concurrentDB,
|
||||
nonconcurrentDB: nonconcurrentDB,
|
||||
concurrentDB: concurrentDB,
|
||||
nonconcurrentDB: nonconcurrentDB,
|
||||
MaxLockRetries: 8,
|
||||
ModelQueryTimeout: 90 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +35,14 @@ type Dao struct {
|
||||
concurrentDB dbx.Builder
|
||||
nonconcurrentDB dbx.Builder
|
||||
|
||||
// MaxLockRetries specifies the default max "database is locked" auto retry attempts.
|
||||
MaxLockRetries int
|
||||
|
||||
// ModelQueryTimeout is the default max duration of a running ModelQuery().
|
||||
//
|
||||
// This field has no effect if an explicit query context is already specified.
|
||||
ModelQueryTimeout time.Duration
|
||||
|
||||
BeforeCreateFunc func(eventDao *Dao, m models.Model) error
|
||||
AfterCreateFunc func(eventDao *Dao, m models.Model)
|
||||
BeforeUpdateFunc func(eventDao *Dao, m models.Model) error
|
||||
@@ -63,15 +74,17 @@ func (dao *Dao) NonconcurrentDB() dbx.Builder {
|
||||
return dao.nonconcurrentDB
|
||||
}
|
||||
|
||||
// ModelQuery creates a new query with preset Select and From fields
|
||||
// based on the provided model argument.
|
||||
// ModelQuery creates a new preconfigured select query with preset
|
||||
// SELECT, FROM and other common fields based on the provided model.
|
||||
func (dao *Dao) ModelQuery(m models.Model) *dbx.SelectQuery {
|
||||
tableName := m.TableName()
|
||||
|
||||
return dao.DB().
|
||||
Select("{{" + tableName + "}}.*").
|
||||
From(tableName).
|
||||
WithExecHook(onLockErrorRetry)
|
||||
WithBuildHook(func(query *dbx.Query) {
|
||||
query.WithExecHook(execLockRetry(dao.ModelQueryTimeout, dao.MaxLockRetries))
|
||||
})
|
||||
}
|
||||
|
||||
// FindById finds a single db record with the specified id and
|
||||
@@ -189,7 +202,7 @@ func (dao *Dao) Delete(m models.Model) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
}, defaultMaxRetries)
|
||||
})
|
||||
}
|
||||
|
||||
// Save upserts (update or create if primary key is not set) the provided model.
|
||||
@@ -197,12 +210,12 @@ func (dao *Dao) Save(m models.Model) error {
|
||||
if m.IsNew() {
|
||||
return dao.lockRetry(func(retryDao *Dao) error {
|
||||
return retryDao.create(m)
|
||||
}, defaultMaxRetries)
|
||||
})
|
||||
}
|
||||
|
||||
return dao.lockRetry(func(retryDao *Dao) error {
|
||||
return retryDao.update(m)
|
||||
}, defaultMaxRetries)
|
||||
})
|
||||
}
|
||||
|
||||
func (dao *Dao) update(m models.Model) error {
|
||||
@@ -296,7 +309,7 @@ func (dao *Dao) create(m models.Model) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dao *Dao) lockRetry(op func(retryDao *Dao) error, maxRetries int) error {
|
||||
func (dao *Dao) lockRetry(op func(retryDao *Dao) error) error {
|
||||
retryDao := dao
|
||||
|
||||
return baseLockRetry(func(attempt int) error {
|
||||
@@ -310,5 +323,5 @@ func (dao *Dao) lockRetry(op func(retryDao *Dao) error, maxRetries int) error {
|
||||
}
|
||||
|
||||
return op(retryDao)
|
||||
}, maxRetries)
|
||||
}, dao.MaxLockRetries)
|
||||
}
|
||||
|
||||
+12
-14
@@ -8,26 +8,24 @@ import (
|
||||
"github.com/pocketbase/dbx"
|
||||
)
|
||||
|
||||
const defaultQueryTimeout time.Duration = 2 * time.Minute
|
||||
// default retries intervals (in ms)
|
||||
var defaultRetryIntervals = []int{100, 250, 350, 500, 700, 1000}
|
||||
|
||||
const defaultMaxRetries int = 10
|
||||
|
||||
var defaultRetryIntervals = []int{100, 250, 350, 500, 700, 1000, 1200, 1500}
|
||||
|
||||
func onLockErrorRetry(s *dbx.SelectQuery, op func() error) error {
|
||||
return baseLockRetry(func(attempt int) error {
|
||||
// load a default timeout context if not set explicitly
|
||||
if s.Context() == nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultQueryTimeout)
|
||||
func execLockRetry(timeout time.Duration, maxRetries int) dbx.ExecHookFunc {
|
||||
return func(q *dbx.Query, op func() error) error {
|
||||
if q.Context() == nil {
|
||||
cancelCtx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer func() {
|
||||
cancel()
|
||||
s.WithContext(nil) // reset
|
||||
q.WithContext(nil) // reset
|
||||
}()
|
||||
s.WithContext(ctx)
|
||||
q.WithContext(cancelCtx)
|
||||
}
|
||||
|
||||
return op()
|
||||
}, defaultMaxRetries)
|
||||
return baseLockRetry(func(attempt int) error {
|
||||
return op()
|
||||
}, maxRetries)
|
||||
}
|
||||
}
|
||||
|
||||
func baseLockRetry(op func(attempt int) error, maxRetries int) error {
|
||||
|
||||
@@ -6,12 +6,12 @@ import (
|
||||
)
|
||||
|
||||
func TestGetDefaultRetryInterval(t *testing.T) {
|
||||
if i := getDefaultRetryInterval(-1); i.Milliseconds() != 1500 {
|
||||
t.Fatalf("Expected 1500ms, got %v", i)
|
||||
if i := getDefaultRetryInterval(-1); i.Milliseconds() != 1000 {
|
||||
t.Fatalf("Expected 1000ms, got %v", i)
|
||||
}
|
||||
|
||||
if i := getDefaultRetryInterval(999); i.Milliseconds() != 1500 {
|
||||
t.Fatalf("Expected 1500ms, got %v", i)
|
||||
if i := getDefaultRetryInterval(999); i.Milliseconds() != 1000 {
|
||||
t.Fatalf("Expected 1000ms, got %v", i)
|
||||
}
|
||||
|
||||
if i := getDefaultRetryInterval(3); i.Milliseconds() != 500 {
|
||||
|
||||
+51
-49
@@ -23,68 +23,70 @@ func (dao *Dao) RecordQuery(collection *models.Collection) *dbx.SelectQuery {
|
||||
return dao.DB().
|
||||
Select(selectCols).
|
||||
From(tableName).
|
||||
WithExecHook(onLockErrorRetry).
|
||||
WithOneHook(func(s *dbx.SelectQuery, a any, op func(b any) error) error {
|
||||
switch v := a.(type) {
|
||||
case *models.Record:
|
||||
if v == nil {
|
||||
return op(a)
|
||||
}
|
||||
WithBuildHook(func(query *dbx.Query) {
|
||||
query.WithExecHook(execLockRetry(dao.ModelQueryTimeout, dao.MaxLockRetries)).
|
||||
WithOneHook(func(q *dbx.Query, a any, op func(b any) error) error {
|
||||
switch v := a.(type) {
|
||||
case *models.Record:
|
||||
if v == nil {
|
||||
return op(a)
|
||||
}
|
||||
|
||||
row := dbx.NullStringMap{}
|
||||
if err := op(&row); err != nil {
|
||||
return err
|
||||
}
|
||||
row := dbx.NullStringMap{}
|
||||
if err := op(&row); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
record := models.NewRecordFromNullStringMap(collection, row)
|
||||
record := models.NewRecordFromNullStringMap(collection, row)
|
||||
|
||||
*v = *record
|
||||
*v = *record
|
||||
|
||||
return nil
|
||||
default:
|
||||
return op(a)
|
||||
}
|
||||
}).
|
||||
WithAllHook(func(s *dbx.SelectQuery, sliceA any, op func(sliceB any) error) error {
|
||||
switch v := sliceA.(type) {
|
||||
case *[]*models.Record:
|
||||
if v == nil {
|
||||
return op(sliceA)
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return op(a)
|
||||
}
|
||||
}).
|
||||
WithAllHook(func(q *dbx.Query, sliceA any, op func(sliceB any) error) error {
|
||||
switch v := sliceA.(type) {
|
||||
case *[]*models.Record:
|
||||
if v == nil {
|
||||
return op(sliceA)
|
||||
}
|
||||
|
||||
rows := []dbx.NullStringMap{}
|
||||
if err := op(&rows); err != nil {
|
||||
return err
|
||||
}
|
||||
rows := []dbx.NullStringMap{}
|
||||
if err := op(&rows); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
records := models.NewRecordsFromNullStringMaps(collection, rows)
|
||||
records := models.NewRecordsFromNullStringMaps(collection, rows)
|
||||
|
||||
*v = records
|
||||
*v = records
|
||||
|
||||
return nil
|
||||
case *[]models.Record:
|
||||
if v == nil {
|
||||
return op(sliceA)
|
||||
}
|
||||
return nil
|
||||
case *[]models.Record:
|
||||
if v == nil {
|
||||
return op(sliceA)
|
||||
}
|
||||
|
||||
rows := []dbx.NullStringMap{}
|
||||
if err := op(&rows); err != nil {
|
||||
return err
|
||||
}
|
||||
rows := []dbx.NullStringMap{}
|
||||
if err := op(&rows); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
records := models.NewRecordsFromNullStringMaps(collection, rows)
|
||||
records := models.NewRecordsFromNullStringMaps(collection, rows)
|
||||
|
||||
nonPointers := make([]models.Record, len(records))
|
||||
for i, r := range records {
|
||||
nonPointers[i] = *r
|
||||
}
|
||||
nonPointers := make([]models.Record, len(records))
|
||||
for i, r := range records {
|
||||
nonPointers[i] = *r
|
||||
}
|
||||
|
||||
*v = nonPointers
|
||||
*v = nonPointers
|
||||
|
||||
return nil
|
||||
default:
|
||||
return op(sliceA)
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return op(sliceA)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user