[#1187] added Dao query semaphore and base fail/retry
This commit is contained in:
+113
-17
@@ -4,13 +4,19 @@
|
||||
package daos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase/models"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
const DefaultMaxFailRetries = 5
|
||||
|
||||
// New creates a new Dao instance with the provided db builder.
|
||||
func New(db dbx.Builder) *Dao {
|
||||
return &Dao{
|
||||
@@ -21,7 +27,9 @@ func New(db dbx.Builder) *Dao {
|
||||
// Dao handles various db operations.
|
||||
// Think of Dao as a repository and service layer in one.
|
||||
type Dao struct {
|
||||
db dbx.Builder
|
||||
db dbx.Builder
|
||||
sem *semaphore.Weighted
|
||||
mux sync.RWMutex
|
||||
|
||||
BeforeCreateFunc func(eventDao *Dao, m models.Model) error
|
||||
AfterCreateFunc func(eventDao *Dao, m models.Model)
|
||||
@@ -36,11 +44,51 @@ func (dao *Dao) DB() dbx.Builder {
|
||||
return dao.db
|
||||
}
|
||||
|
||||
// Block acquires a lock and blocks all other go routines that uses
|
||||
// the Dao instance until dao.Continue() is called, effectively making
|
||||
// the concurrent requests to perform synchronous db operations.
|
||||
//
|
||||
// This method should be used only as a last resort as a workaround
|
||||
// for the SQLITE_BUSY error when mixing read&write in a transaction.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// func someLongRunningTransaction() error {
|
||||
// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
// defer cancel()
|
||||
// if err := app.Dao().Block(ctx); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// defer app.Dao().Continue()
|
||||
//
|
||||
// return app.Dao().RunInTransaction(func (txDao *daos.Dao) error {
|
||||
// // some long running read&write transaction...
|
||||
// })
|
||||
// }
|
||||
func (dao *Dao) Block(ctx context.Context) error {
|
||||
if dao.sem == nil {
|
||||
dao.mux.Lock()
|
||||
dao.sem = semaphore.NewWeighted(1)
|
||||
dao.mux.Unlock()
|
||||
}
|
||||
|
||||
return dao.sem.Acquire(ctx, 1)
|
||||
}
|
||||
|
||||
// Continue releases the previously acquired Block() lock.
|
||||
func (dao *Dao) Continue() {
|
||||
if dao.sem == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dao.sem.Release(1)
|
||||
}
|
||||
|
||||
// ModelQuery creates a new query with preset Select and From fields
|
||||
// based on the provided model argument.
|
||||
func (dao *Dao) ModelQuery(m models.Model) *dbx.SelectQuery {
|
||||
tableName := m.TableName()
|
||||
return dao.db.Select(fmt.Sprintf("{{%s}}.*", tableName)).From(tableName)
|
||||
return dao.db.Select("{{" + tableName + "}}.*").From(tableName)
|
||||
}
|
||||
|
||||
// FindById finds a single db record with the specified id and
|
||||
@@ -63,7 +111,17 @@ func (dao *Dao) RunInTransaction(fn func(txDao *Dao) error) error {
|
||||
case *dbx.Tx:
|
||||
// nested transactions are not supported by default
|
||||
// so execute the function within the current transaction
|
||||
return fn(dao)
|
||||
|
||||
// create a new dao with the same hooks to avoid semaphore deadlock when nesting
|
||||
txDao := New(txOrDB)
|
||||
txDao.BeforeCreateFunc = dao.BeforeCreateFunc
|
||||
txDao.BeforeUpdateFunc = dao.BeforeUpdateFunc
|
||||
txDao.BeforeDeleteFunc = dao.BeforeDeleteFunc
|
||||
txDao.AfterCreateFunc = dao.AfterCreateFunc
|
||||
txDao.AfterUpdateFunc = dao.AfterUpdateFunc
|
||||
txDao.AfterDeleteFunc = dao.AfterDeleteFunc
|
||||
|
||||
return fn(txDao)
|
||||
case *dbx.DB:
|
||||
afterCalls := []afterCallGroup{}
|
||||
|
||||
@@ -131,30 +189,36 @@ func (dao *Dao) Delete(m models.Model) error {
|
||||
return errors.New("ID is not set")
|
||||
}
|
||||
|
||||
if dao.BeforeDeleteFunc != nil {
|
||||
if err := dao.BeforeDeleteFunc(dao, m); err != nil {
|
||||
return dao.failRetry(func(retryDao *Dao) error {
|
||||
if retryDao.BeforeDeleteFunc != nil {
|
||||
if err := retryDao.BeforeDeleteFunc(retryDao, m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := retryDao.db.Model(m).Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := dao.db.Model(m).Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
if retryDao.AfterDeleteFunc != nil {
|
||||
retryDao.AfterDeleteFunc(retryDao, m)
|
||||
}
|
||||
|
||||
if dao.AfterDeleteFunc != nil {
|
||||
dao.AfterDeleteFunc(dao, m)
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}, DefaultMaxFailRetries)
|
||||
}
|
||||
|
||||
// Save upserts (update or create if primary key is not set) the provided model.
|
||||
func (dao *Dao) Save(m models.Model) error {
|
||||
if m.IsNew() {
|
||||
return dao.create(m)
|
||||
return dao.failRetry(func(retryDao *Dao) error {
|
||||
return retryDao.create(m)
|
||||
}, DefaultMaxFailRetries)
|
||||
}
|
||||
|
||||
return dao.update(m)
|
||||
return dao.failRetry(func(retryDao *Dao) error {
|
||||
return retryDao.update(m)
|
||||
}, DefaultMaxFailRetries)
|
||||
}
|
||||
|
||||
func (dao *Dao) update(m models.Model) error {
|
||||
@@ -247,3 +311,35 @@ func (dao *Dao) create(m models.Model) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dao *Dao) failRetry(op func(retryDao *Dao) error, maxRetries int) error {
|
||||
retryDao := dao
|
||||
attempts := 1
|
||||
|
||||
Retry:
|
||||
if attempts == 2 {
|
||||
// assign new Dao without the before hooks to avoid triggering
|
||||
// the already fired before callbacks multiple times
|
||||
retryDao = &Dao{
|
||||
db: dao.db,
|
||||
AfterCreateFunc: dao.AfterCreateFunc,
|
||||
AfterUpdateFunc: dao.AfterUpdateFunc,
|
||||
AfterDeleteFunc: dao.AfterDeleteFunc,
|
||||
}
|
||||
}
|
||||
|
||||
// execute
|
||||
err := op(retryDao)
|
||||
|
||||
if err != nil &&
|
||||
attempts < maxRetries &&
|
||||
// note: we are checking the err message so that we can handle both the cgo and noncgo errors
|
||||
strings.Contains(err.Error(), "database is locked") {
|
||||
// wait and retry
|
||||
time.Sleep(time.Duration(200*attempts) * time.Millisecond)
|
||||
attempts++
|
||||
goto Retry
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -289,6 +289,170 @@ func TestDaoDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaoRetryCreate(t *testing.T) {
|
||||
testApp, _ := tests.NewTestApp()
|
||||
defer testApp.Cleanup()
|
||||
|
||||
// init mock retry dao
|
||||
retryBeforeCreateHookCalls := 0
|
||||
retryAfterCreateHookCalls := 0
|
||||
retryDao := daos.New(testApp.DB())
|
||||
retryDao.BeforeCreateFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeCreateHookCalls++
|
||||
return errors.New("database is locked")
|
||||
}
|
||||
retryDao.AfterCreateFunc = func(eventDao *daos.Dao, m models.Model) {
|
||||
retryAfterCreateHookCalls++
|
||||
}
|
||||
|
||||
model := &models.Admin{Email: "new@example.com"}
|
||||
if err := retryDao.Save(model); err != nil {
|
||||
t.Fatalf("Expected nil after retry, got error: %v", err)
|
||||
}
|
||||
|
||||
// the before hook is expected to be called only once because
|
||||
// it is ignored after the first "database is locked" error
|
||||
if retryBeforeCreateHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeCreateHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterCreateHookCalls != 1 {
|
||||
t.Fatalf("Expected after hook calls to be 1, got %d", retryAfterCreateHookCalls)
|
||||
}
|
||||
|
||||
// with non-locking error
|
||||
retryBeforeCreateHookCalls = 0
|
||||
retryAfterCreateHookCalls = 0
|
||||
retryDao.BeforeCreateFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeCreateHookCalls++
|
||||
return errors.New("non-locking error")
|
||||
}
|
||||
|
||||
dummy := &models.Admin{Email: "test@example.com"}
|
||||
if err := retryDao.Save(dummy); err == nil {
|
||||
t.Fatal("Expected error, got nil")
|
||||
}
|
||||
|
||||
if retryBeforeCreateHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeCreateHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterCreateHookCalls != 0 {
|
||||
t.Fatalf("Expected after hook calls to be 0, got %d", retryAfterCreateHookCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaoRetryUpdate(t *testing.T) {
|
||||
testApp, _ := tests.NewTestApp()
|
||||
defer testApp.Cleanup()
|
||||
|
||||
model, err := testApp.Dao().FindAdminByEmail("test@example.com")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// init mock retry dao
|
||||
retryBeforeUpdateHookCalls := 0
|
||||
retryAfterUpdateHookCalls := 0
|
||||
retryDao := daos.New(testApp.DB())
|
||||
retryDao.BeforeUpdateFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeUpdateHookCalls++
|
||||
return errors.New("database is locked")
|
||||
}
|
||||
retryDao.AfterUpdateFunc = func(eventDao *daos.Dao, m models.Model) {
|
||||
retryAfterUpdateHookCalls++
|
||||
}
|
||||
|
||||
if err := retryDao.Save(model); err != nil {
|
||||
t.Fatalf("Expected nil after retry, got error: %v", err)
|
||||
}
|
||||
|
||||
// the before hook is expected to be called only once because
|
||||
// it is ignored after the first "database is locked" error
|
||||
if retryBeforeUpdateHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeUpdateHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterUpdateHookCalls != 1 {
|
||||
t.Fatalf("Expected after hook calls to be 1, got %d", retryAfterUpdateHookCalls)
|
||||
}
|
||||
|
||||
// with non-locking error
|
||||
retryBeforeUpdateHookCalls = 0
|
||||
retryAfterUpdateHookCalls = 0
|
||||
retryDao.BeforeUpdateFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeUpdateHookCalls++
|
||||
return errors.New("non-locking error")
|
||||
}
|
||||
|
||||
if err := retryDao.Save(model); err == nil {
|
||||
t.Fatal("Expected error, got nil")
|
||||
}
|
||||
|
||||
if retryBeforeUpdateHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeUpdateHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterUpdateHookCalls != 0 {
|
||||
t.Fatalf("Expected after hook calls to be 0, got %d", retryAfterUpdateHookCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaoRetryDelete(t *testing.T) {
|
||||
testApp, _ := tests.NewTestApp()
|
||||
defer testApp.Cleanup()
|
||||
|
||||
// init mock retry dao
|
||||
retryBeforeDeleteHookCalls := 0
|
||||
retryAfterDeleteHookCalls := 0
|
||||
retryDao := daos.New(testApp.DB())
|
||||
retryDao.BeforeDeleteFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeDeleteHookCalls++
|
||||
return errors.New("database is locked")
|
||||
}
|
||||
retryDao.AfterDeleteFunc = func(eventDao *daos.Dao, m models.Model) {
|
||||
retryAfterDeleteHookCalls++
|
||||
}
|
||||
|
||||
model, _ := retryDao.FindAdminByEmail("test@example.com")
|
||||
if err := retryDao.Delete(model); err != nil {
|
||||
t.Fatalf("Expected nil after retry, got error: %v", err)
|
||||
}
|
||||
|
||||
// the before hook is expected to be called only once because
|
||||
// it is ignored after the first "database is locked" error
|
||||
if retryBeforeDeleteHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeDeleteHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterDeleteHookCalls != 1 {
|
||||
t.Fatalf("Expected after hook calls to be 1, got %d", retryAfterDeleteHookCalls)
|
||||
}
|
||||
|
||||
// with non-locking error
|
||||
retryBeforeDeleteHookCalls = 0
|
||||
retryAfterDeleteHookCalls = 0
|
||||
retryDao.BeforeDeleteFunc = func(eventDao *daos.Dao, m models.Model) error {
|
||||
retryBeforeDeleteHookCalls++
|
||||
return errors.New("non-locking error")
|
||||
}
|
||||
|
||||
dummy := &models.Admin{}
|
||||
dummy.RefreshId()
|
||||
dummy.MarkAsNotNew()
|
||||
if err := retryDao.Delete(dummy); err == nil {
|
||||
t.Fatal("Expected error, got nil")
|
||||
}
|
||||
|
||||
if retryBeforeDeleteHookCalls != 1 {
|
||||
t.Fatalf("Expected before hook calls to be 1, got %d", retryBeforeDeleteHookCalls)
|
||||
}
|
||||
|
||||
if retryAfterDeleteHookCalls != 0 {
|
||||
t.Fatalf("Expected after hook calls to be 0, got %d", retryAfterDeleteHookCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaoBeforeHooksError(t *testing.T) {
|
||||
testApp, _ := tests.NewTestApp()
|
||||
defer testApp.Cleanup()
|
||||
|
||||
+17
-39
@@ -1,6 +1,7 @@
|
||||
package daos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -348,50 +349,27 @@ func (dao *Dao) SaveRecord(record *models.Record) error {
|
||||
// The delete operation may fail if the record is part of a required
|
||||
// reference in another record (aka. cannot be deleted or set to NULL).
|
||||
func (dao *Dao) DeleteRecord(record *models.Record) error {
|
||||
const maxAttempts = 6
|
||||
|
||||
attempts := 1
|
||||
|
||||
Retry:
|
||||
err := dao.deleteRecord(record, attempts)
|
||||
if err != nil &&
|
||||
attempts <= maxAttempts &&
|
||||
// note: we are checking the error msg so that we can handle both the cgo and noncgo errors
|
||||
strings.Contains(err.Error(), "database is locked") {
|
||||
time.Sleep(time.Duration(300*attempts) * time.Millisecond)
|
||||
attempts++
|
||||
goto Retry
|
||||
// fetch rel references (if any)
|
||||
//
|
||||
// note: the select is outside of the transaction to minimize
|
||||
// SQLITE_BUSY errors when mixing read&write in a single transaction
|
||||
refs, err := dao.FindCollectionReferences(record.Collection())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (dao *Dao) deleteRecord(record *models.Record, attempts int) error {
|
||||
return dao.RunInTransaction(func(txDao *Dao) error {
|
||||
// unset transaction dao before hook on retry to avoid
|
||||
// triggering the same before callbacks multiple times
|
||||
if attempts > 1 {
|
||||
oldBeforeCreateFunc := txDao.BeforeCreateFunc
|
||||
oldBeforeUpdateFunc := txDao.BeforeUpdateFunc
|
||||
oldBeforeDeleteFunc := txDao.BeforeDeleteFunc
|
||||
txDao.BeforeCreateFunc = nil
|
||||
txDao.BeforeUpdateFunc = nil
|
||||
txDao.BeforeDeleteFunc = nil
|
||||
defer func() {
|
||||
if txDao != nil {
|
||||
txDao.BeforeCreateFunc = oldBeforeCreateFunc
|
||||
txDao.BeforeUpdateFunc = oldBeforeUpdateFunc
|
||||
txDao.BeforeDeleteFunc = oldBeforeDeleteFunc
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// check for references
|
||||
refs, err := txDao.FindCollectionReferences(record.Collection())
|
||||
if err != nil {
|
||||
// run all consequent DeleteRecord requests synchroniously
|
||||
// to minimize SQLITE_BUSY errors
|
||||
if len(refs) > 0 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := dao.Block(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer dao.Continue()
|
||||
}
|
||||
|
||||
return dao.RunInTransaction(func(txDao *Dao) error {
|
||||
// check if related records has to be deleted (if `CascadeDelete` is set)
|
||||
// OR
|
||||
// just unset the record id from any relation field values (if they are not required)
|
||||
|
||||
Reference in New Issue
Block a user