added split (sync and async) db connections pool

This commit is contained in:
Gani Georgiev
2022-12-15 16:42:35 +02:00
parent e964b019c2
commit b9e257d2b1
13 changed files with 304 additions and 127 deletions
+50 -17
View File
@@ -17,17 +17,29 @@ import (
const DefaultMaxFailRetries = 5
// New creates a new Dao instance with the provided db builder.
// New creates a new Dao instance with the provided db builder
// (for both async and sync db operations).
func New(db dbx.Builder) *Dao {
return NewMultiDB(db, db)
}
// New creates a new Dao instance with the provided dedicated
// async and sync db builders.
func NewMultiDB(asyncDB, syncDB dbx.Builder) *Dao {
return &Dao{
db: db,
asyncDB: asyncDB,
syncDB: syncDB,
}
}
// Dao handles various db operations.
// Think of Dao as a repository and service layer in one.
type Dao struct {
db dbx.Builder
// in a transaction both refer to the same *dbx.TX instance
asyncDB dbx.Builder
syncDB dbx.Builder
// @todo delete after removing Block and Continue
sem *semaphore.Weighted
mux sync.RWMutex
@@ -39,11 +51,29 @@ type Dao struct {
AfterDeleteFunc func(eventDao *Dao, m models.Model)
}
// DB returns the internal db builder (*dbx.DB or *dbx.TX).
// DB returns the default dao db builder (*dbx.DB or *dbx.TX).
//
// Currently the default db builder is dao.asyncDB but that may change in the future.
func (dao *Dao) DB() dbx.Builder {
return dao.db
return dao.AsyncDB()
}
// AsyncDB returns the dao asynchronous db builder (*dbx.DB or *dbx.TX).
//
// In a transaction the asyncDB and syncDB refer to the same *dbx.TX instance.
func (dao *Dao) AsyncDB() dbx.Builder {
return dao.asyncDB
}
// SyncDB returns the dao synchronous db builder (*dbx.DB or *dbx.TX).
//
// In a transaction the asyncDB and syncDB refer to the same *dbx.TX instance.
func (dao *Dao) SyncDB() dbx.Builder {
return dao.syncDB
}
// Deprecated: Will be removed in the next releases. Use [Dao.SyncDB()] instead.
//
// Block acquires a lock and blocks all other go routines that uses
// the Dao instance until dao.Continue() is called, effectively making
// the concurrent requests to perform synchronous db operations.
@@ -75,6 +105,8 @@ func (dao *Dao) Block(ctx context.Context) error {
return dao.sem.Acquire(ctx, 1)
}
// Deprecated: Will be removed in the next releases. Use [Dao.SyncDB()] instead.
//
// Continue releases the previously acquired Block() lock.
func (dao *Dao) Continue() {
if dao.sem == nil {
@@ -88,7 +120,7 @@ func (dao *Dao) Continue() {
// based on the provided model argument.
func (dao *Dao) ModelQuery(m models.Model) *dbx.SelectQuery {
tableName := m.TableName()
return dao.db.Select("{{" + tableName + "}}.*").From(tableName)
return dao.DB().Select("{{" + tableName + "}}.*").From(tableName)
}
// FindById finds a single db record with the specified id and
@@ -105,9 +137,9 @@ type afterCallGroup struct {
// RunInTransaction wraps fn into a transaction.
//
// It is safe to nest RunInTransaction calls.
// It is safe to nest RunInTransaction calls as long as you use the txDao.
func (dao *Dao) RunInTransaction(fn func(txDao *Dao) error) error {
switch txOrDB := dao.db.(type) {
switch txOrDB := dao.SyncDB().(type) {
case *dbx.Tx:
// nested transactions are not supported by default
// so execute the function within the current transaction
@@ -165,14 +197,15 @@ func (dao *Dao) RunInTransaction(fn func(txDao *Dao) error) error {
if txError == nil {
// execute after event calls on successful transaction
// (note: using the non-transaction dao to allow following queries in the after hooks)
for _, call := range afterCalls {
switch call.Action {
case "create":
dao.AfterCreateFunc(call.EventDao, call.Model)
dao.AfterCreateFunc(dao, call.Model)
case "update":
dao.AfterUpdateFunc(call.EventDao, call.Model)
dao.AfterUpdateFunc(dao, call.Model)
case "delete":
dao.AfterDeleteFunc(call.EventDao, call.Model)
dao.AfterDeleteFunc(dao, call.Model)
}
}
}
@@ -196,7 +229,7 @@ func (dao *Dao) Delete(m models.Model) error {
}
}
if err := retryDao.db.Model(m).Delete(); err != nil {
if err := retryDao.SyncDB().Model(m).Delete(); err != nil {
return err
}
@@ -241,7 +274,7 @@ func (dao *Dao) update(m models.Model) error {
if v, ok := any(m).(models.ColumnValueMapper); ok {
dataMap := v.ColumnValueMap()
_, err := dao.db.Update(
_, err := dao.SyncDB().Update(
m.TableName(),
dataMap,
dbx.HashExp{"id": m.GetId()},
@@ -251,7 +284,7 @@ func (dao *Dao) update(m models.Model) error {
return err
}
} else {
if err := dao.db.Model(m).Update(); err != nil {
if err := dao.SyncDB().Model(m).Update(); err != nil {
return err
}
}
@@ -292,12 +325,12 @@ func (dao *Dao) create(m models.Model) error {
dataMap["id"] = m.GetId()
}
_, err := dao.db.Insert(m.TableName(), dataMap).Execute()
_, err := dao.SyncDB().Insert(m.TableName(), dataMap).Execute()
if err != nil {
return err
}
} else {
if err := dao.db.Model(m).Insert(); err != nil {
if err := dao.SyncDB().Model(m).Insert(); err != nil {
return err
}
}
@@ -320,7 +353,7 @@ Retry:
if attempts == 2 {
// assign new Dao without the before hooks to avoid triggering
// the already fired before callbacks multiple times
retryDao = New(dao.db)
retryDao = NewMultiDB(dao.asyncDB, dao.syncDB)
retryDao.AfterCreateFunc = dao.AfterCreateFunc
retryDao.AfterUpdateFunc = dao.AfterUpdateFunc
retryDao.AfterDeleteFunc = dao.AfterDeleteFunc
+19
View File
@@ -20,6 +20,25 @@ func TestNew(t *testing.T) {
}
}
func TestNewMultiDB(t *testing.T) {
testApp, _ := tests.NewTestApp()
defer testApp.Cleanup()
dao := daos.NewMultiDB(testApp.Dao().AsyncDB(), testApp.Dao().SyncDB())
if dao.DB() != testApp.Dao().AsyncDB() {
t.Fatal("[db-asyncdb] The 2 db instances are different")
}
if dao.AsyncDB() != testApp.Dao().AsyncDB() {
t.Fatal("[asyncdb-asyncdb] The 2 db instances are different")
}
if dao.SyncDB() != testApp.Dao().SyncDB() {
t.Fatal("[syncdb-syncdb] The 2 db instances are different")
}
}
func TestDaoModelQuery(t *testing.T) {
testApp, _ := tests.NewTestApp()
defer testApp.Cleanup()
+3 -18
View File
@@ -1,12 +1,10 @@
package daos
import (
"context"
"errors"
"fmt"
"math"
"strings"
"time"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/models"
@@ -359,25 +357,12 @@ func (dao *Dao) DeleteRecord(record *models.Record) error {
return err
}
// run all consequent DeleteRecord requests synchroniously
// to minimize SQLITE_BUSY errors
if len(refs) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
if err := dao.Block(ctx); err != nil {
// ignore blocking and try to run directly...
} else {
defer dao.Continue()
}
}
return dao.RunInTransaction(func(txDao *Dao) error {
// manually trigger delete on any linked external auth to ensure
// that the `OnModel*` hooks are triggered.
//
// note: the select is outside of the transaction to minimize
// SQLITE_BUSY errors when mixing read&write in a single transaction
// that the `OnModel*` hooks are triggered
if record.Collection().IsAuth() {
// note: the select is outside of the transaction to minimize
// SQLITE_BUSY errors when mixing read&write in a single transaction
externalAuths, err := dao.FindAllExternalAuthsByRecord(record)
if err != nil {
return err
+8 -2
View File
@@ -634,10 +634,16 @@ func TestDeleteRecord(t *testing.T) {
// delete existing record + cascade
// ---
calledQueries := []string{}
app.DB().QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.Dao().SyncDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.DB().ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
app.Dao().AsyncDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.Dao().SyncDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}
app.Dao().AsyncDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}
rec3, _ := app.Dao().FindRecordById("users", "oap640cot4yru2s")