logs refactoring

This commit is contained in:
Gani Georgiev
2023-11-26 13:33:17 +02:00
parent ff5535f4de
commit 821aae4a62
109 changed files with 7320 additions and 3728 deletions
+8 -5
View File
@@ -5,6 +5,7 @@ package core
import (
"context"
"log/slog"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/daos"
@@ -48,6 +49,9 @@ type App interface {
// the users table from LogsDao will result in error.
LogsDao() *daos.Dao
// Logger returns the active app logger.
Logger() *slog.Logger
// DataDir returns the app data directory path.
DataDir() string
@@ -55,16 +59,15 @@ type App interface {
// (used for settings encryption).
EncryptionEnv() string
// IsDebug returns whether the app is in debug mode
// (showing more detailed error logs, executed sql statements, etc.).
IsDebug() bool
// Settings returns the loaded app settings.
Settings() *settings.Settings
// Cache returns the app internal cache store.
// Deprecated: Use app.Store() instead.
Cache() *store.Store[any]
// Store returns the app runtime store.
Store() *store.Store[any]
// SubscriptionsBroker returns the app realtime subscriptions broker instance.
SubscriptionsBroker() *subscriptions.Broker
+144 -40
View File
@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"log"
"log/slog"
"os"
"path/filepath"
"runtime"
@@ -18,10 +19,14 @@ import (
"github.com/pocketbase/pocketbase/models/settings"
"github.com/pocketbase/pocketbase/tools/filesystem"
"github.com/pocketbase/pocketbase/tools/hook"
"github.com/pocketbase/pocketbase/tools/logger"
"github.com/pocketbase/pocketbase/tools/mailer"
"github.com/pocketbase/pocketbase/tools/routine"
"github.com/pocketbase/pocketbase/tools/security"
"github.com/pocketbase/pocketbase/tools/store"
"github.com/pocketbase/pocketbase/tools/subscriptions"
"github.com/pocketbase/pocketbase/tools/types"
"github.com/spf13/cast"
)
const (
@@ -39,8 +44,9 @@ var _ App = (*BaseApp)(nil)
// BaseApp implements core.App and defines the base PocketBase app structure.
type BaseApp struct {
// @todo consider introducing a mutex to allow safe concurrent config changes during runtime
// configurable parameters
isDebug bool
dataDir string
encryptionEnv string
dataMaxOpenConns int
@@ -49,11 +55,12 @@ type BaseApp struct {
logsMaxIdleConns int
// internals
cache *store.Store[any]
store *store.Store[any]
settings *settings.Settings
dao *daos.Dao
logsDao *daos.Dao
subscriptionsBroker *subscriptions.Broker
logger *slog.Logger
// app event hooks
onBeforeBootstrap *hook.Hook[*BootstrapEvent]
@@ -169,7 +176,6 @@ type BaseApp struct {
type BaseAppConfig struct {
DataDir string
EncryptionEnv string
IsDebug bool
DataMaxOpenConns int // default to 500
DataMaxIdleConns int // default 20
LogsMaxOpenConns int // default to 100
@@ -183,13 +189,12 @@ type BaseAppConfig struct {
func NewBaseApp(config BaseAppConfig) *BaseApp {
app := &BaseApp{
dataDir: config.DataDir,
isDebug: config.IsDebug,
encryptionEnv: config.EncryptionEnv,
dataMaxOpenConns: config.DataMaxOpenConns,
dataMaxIdleConns: config.DataMaxIdleConns,
logsMaxOpenConns: config.LogsMaxOpenConns,
logsMaxIdleConns: config.LogsMaxIdleConns,
cache: store.New[any](nil),
store: store.New[any](nil),
settings: settings.New(),
subscriptionsBroker: subscriptions.NewBroker(),
@@ -314,6 +319,17 @@ func (app *BaseApp) IsBootstrapped() bool {
return app.dao != nil && app.logsDao != nil && app.settings != nil
}
// Logger returns the default app logger.
//
// If the application is not bootstrapped yet, fallbacks to slog.Default().
func (app *BaseApp) Logger() *slog.Logger {
if app.logger == nil {
return slog.Default()
}
return app.logger
}
// Bootstrap initializes the application
// (aka. create data dir, open db connections, load settings, etc.).
//
@@ -343,6 +359,10 @@ func (app *BaseApp) Bootstrap() error {
return err
}
if err := app.initLogger(); err != nil {
return err
}
// we don't check for an error because the db migrations may have not been executed yet
app.RefreshSettings()
@@ -438,20 +458,20 @@ func (app *BaseApp) EncryptionEnv() string {
return app.encryptionEnv
}
// IsDebug returns whether the app is in debug mode
// (showing more detailed error logs, executed sql statements, etc.).
func (app *BaseApp) IsDebug() bool {
return app.isDebug
}
// Settings returns the loaded app settings.
func (app *BaseApp) Settings() *settings.Settings {
return app.settings
}
// Cache returns the app internal cache store.
// Deprecated: Use app.Store() instead.
func (app *BaseApp) Cache() *store.Store[any] {
return app.cache
color.Yellow("app.Store() is soft-deprecated. Please replace it with app.Store().")
return app.Store()
}
// Store returns the app internal runtime store.
func (app *BaseApp) Store() *store.Store[any] {
return app.store
}
// SubscriptionsBroker returns the app realtime subscriptions broker instance.
@@ -569,6 +589,11 @@ func (app *BaseApp) RefreshSettings() error {
return err
}
// reload handler level (if initialized)
if h, ok := app.Logger().Handler().(*logger.BatchHandler); ok {
h.SetLevel(slog.Level(app.settings.Logs.MinLevel))
}
return nil
}
@@ -988,7 +1013,7 @@ func (app *BaseApp) initLogsDB() error {
}
concurrentDB.DB().SetMaxOpenConns(maxOpenConns)
concurrentDB.DB().SetMaxIdleConns(maxIdleConns)
concurrentDB.DB().SetConnMaxIdleTime(5 * time.Minute)
concurrentDB.DB().SetConnMaxIdleTime(3 * time.Minute)
nonconcurrentDB, err := connectDB(filepath.Join(app.DataDir(), "logs.db"))
if err != nil {
@@ -996,7 +1021,7 @@ func (app *BaseApp) initLogsDB() error {
}
nonconcurrentDB.DB().SetMaxOpenConns(1)
nonconcurrentDB.DB().SetMaxIdleConns(1)
nonconcurrentDB.DB().SetConnMaxIdleTime(5 * time.Minute)
nonconcurrentDB.DB().SetConnMaxIdleTime(3 * time.Minute)
app.logsDao = daos.NewMultiDB(concurrentDB, nonconcurrentDB)
@@ -1019,7 +1044,7 @@ func (app *BaseApp) initDataDB() error {
}
concurrentDB.DB().SetMaxOpenConns(maxOpenConns)
concurrentDB.DB().SetMaxIdleConns(maxIdleConns)
concurrentDB.DB().SetConnMaxIdleTime(5 * time.Minute)
concurrentDB.DB().SetConnMaxIdleTime(3 * time.Minute)
nonconcurrentDB, err := connectDB(filepath.Join(app.DataDir(), "data.db"))
if err != nil {
@@ -1027,19 +1052,17 @@ func (app *BaseApp) initDataDB() error {
}
nonconcurrentDB.DB().SetMaxOpenConns(1)
nonconcurrentDB.DB().SetMaxIdleConns(1)
nonconcurrentDB.DB().SetConnMaxIdleTime(5 * time.Minute)
nonconcurrentDB.DB().SetConnMaxIdleTime(3 * time.Minute)
if app.IsDebug() {
nonconcurrentDB.QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
color.HiBlack("[%.2fms] %v\n", float64(t.Milliseconds()), sql)
}
concurrentDB.QueryLogFunc = nonconcurrentDB.QueryLogFunc
nonconcurrentDB.ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
color.HiBlack("[%.2fms] %v\n", float64(t.Milliseconds()), sql)
}
concurrentDB.ExecLogFunc = nonconcurrentDB.ExecLogFunc
}
// @todo benchmark whether it will have an impact if always enabled as TRACE log
// nonconcurrentDB.QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
// color.HiBlack("[%.2fms] %v\n", float64(t.Milliseconds()), sql)
// }
// concurrentDB.QueryLogFunc = nonconcurrentDB.QueryLogFunc
// nonconcurrentDB.ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
// color.HiBlack("[%.2fms] %v\n", float64(t.Milliseconds()), sql)
// }
// concurrentDB.ExecLogFunc = nonconcurrentDB.ExecLogFunc
app.dao = app.createDaoWithHooks(concurrentDB, nonconcurrentDB)
@@ -1129,14 +1152,13 @@ func (app *BaseApp) registerDefaultHooks() {
// run in the background for "optimistic" delete to avoid
// blocking the delete transaction
//
// @todo consider creating a bg process queue so that the
// call could be "retried" in case of a failure.
routine.FireAndForget(func() {
if err := deletePrefix(prefix); err != nil && app.IsDebug() {
// non critical error - only log for debug
// (usually could happen because of S3 api limits)
log.Println(err)
if err := deletePrefix(prefix); err != nil {
app.Logger().Error(
"Failed to delete storage prefix (non critical error; usually could happen because of S3 api limits)",
slog.String("prefix", prefix),
slog.String("error", err.Error()),
)
}
})
}
@@ -1144,12 +1166,94 @@ func (app *BaseApp) registerDefaultHooks() {
return nil
})
app.OnTerminate().Add(func(e *TerminateEvent) error {
app.ResetBootstrapState()
if err := app.initAutobackupHooks(); err != nil {
app.Logger().Error("Failed to init auto backup hooks", slog.String("error", err.Error()))
}
}
func (app *BaseApp) initLogger() error {
duration := 3 * time.Second
ticker := time.NewTicker(duration)
done := make(chan bool)
level := slog.LevelInfo
if app.Settings() != nil {
level = slog.Level(app.Settings().Logs.MinLevel)
}
handler := logger.NewBatchHandler(logger.BatchOptions{
Level: level,
BatchSize: 200,
BeforeAddFunc: func(ctx context.Context, log *logger.Log) bool {
ticker.Reset(duration)
return true
},
WriteFunc: func(ctx context.Context, logs []*logger.Log) error {
if !app.IsBootstrapped() {
return nil
}
// write the accumulated logs
// (note: based on several local tests there is no significant performance difference between small number of separate write queries vs 1 big INSERT)
app.LogsDao().RunInTransaction(func(txDao *daos.Dao) error {
model := &models.Log{}
for _, l := range logs {
model.MarkAsNew()
// note: using pseudorandom for a slightly better performance
model.Id = security.PseudorandomStringWithAlphabet(models.DefaultIdLength, models.DefaultIdAlphabet)
model.Level = int(l.Level)
model.Message = l.Message
model.Data = l.Data
model.Created, _ = types.ParseDateTime(l.Time)
model.Updated = model.Created
if err := txDao.SaveLog(model); err != nil {
log.Println("Failed to write log", model, err)
}
}
return nil
})
// delete old logs
// ---
logsMaxDays := app.Settings().Logs.MaxDays
now := time.Now()
lastLogsDeletedAt := cast.ToTime(app.Store().Get("lastLogsDeletedAt"))
daysDiff := now.Sub(lastLogsDeletedAt).Hours() * 24
if daysDiff > float64(logsMaxDays) {
deleteErr := app.LogsDao().DeleteOldLogs(now.AddDate(0, 0, -1*logsMaxDays))
if deleteErr == nil {
app.Store().Set("lastLogsDeletedAt", now)
} else {
log.Println("Logs delete failed", deleteErr)
}
}
return nil
},
})
ctx := context.Background()
go func() {
for {
select {
case <-done:
handler.WriteAll(ctx)
case <-ticker.C:
handler.WriteAll(ctx)
}
}
}()
app.logger = slog.New(handler)
app.OnTerminate().PreAdd(func(e *TerminateEvent) error {
ticker.Stop()
done <- true
return nil
})
if err := app.initAutobackupHooks(); err != nil && app.IsDebug() {
log.Println(err)
}
return nil
}
+40 -23
View File
@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"runtime"
@@ -22,8 +22,11 @@ import (
"github.com/pocketbase/pocketbase/tools/security"
)
// Deprecated: Replaced with StoreKeyActiveBackup.
const CacheKeyActiveBackup string = "@activeBackup"
const StoreKeyActiveBackup string = "@activeBackup"
// CreateBackup creates a new backup of the current app pb_data directory.
//
// If name is empty, it will be autogenerated.
@@ -43,7 +46,7 @@ const CacheKeyActiveBackup string = "@activeBackup"
//
// Backups can be stored on S3 if it is configured in app.Settings().Backups.
func (app *BaseApp) CreateBackup(ctx context.Context, name string) error {
if app.Cache().Has(CacheKeyActiveBackup) {
if app.Store().Has(StoreKeyActiveBackup) {
return errors.New("try again later - another backup/restore operation has already been started")
}
@@ -51,8 +54,8 @@ func (app *BaseApp) CreateBackup(ctx context.Context, name string) error {
name = app.generateBackupName("pb_backup_")
}
app.Cache().Set(CacheKeyActiveBackup, name)
defer app.Cache().Remove(CacheKeyActiveBackup)
app.Store().Set(StoreKeyActiveBackup, name)
defer app.Store().Remove(StoreKeyActiveBackup)
// root dir entries to exclude from the backup generation
exclude := []string{LocalBackupsDirName, LocalTempDirName}
@@ -135,12 +138,12 @@ func (app *BaseApp) RestoreBackup(ctx context.Context, name string) error {
return errors.New("restore is not supported on windows")
}
if app.Cache().Has(CacheKeyActiveBackup) {
if app.Store().Has(StoreKeyActiveBackup) {
return errors.New("try again later - another backup/restore operation has already been started")
}
app.Cache().Set(CacheKeyActiveBackup, name)
defer app.Cache().Remove(CacheKeyActiveBackup)
app.Store().Set(StoreKeyActiveBackup, name)
defer app.Store().Remove(StoreKeyActiveBackup)
fsys, err := app.NewBackupsFilesystem()
if err != nil {
@@ -189,8 +192,12 @@ func (app *BaseApp) RestoreBackup(ctx context.Context, name string) error {
// remove the extracted zip file since we no longer need it
// (this is in case the app restarts and the defer calls are not called)
if err := os.Remove(tempZip.Name()); err != nil && app.IsDebug() {
log.Println(err)
if err := os.Remove(tempZip.Name()); err != nil {
app.Logger().Debug(
"[RestoreBackup] Failed to remove the temp zip backup file",
slog.String("file", tempZip.Name()),
slog.String("error", err.Error()),
)
}
// root dir entries to exclude from the backup restore
@@ -223,8 +230,8 @@ func (app *BaseApp) RestoreBackup(ctx context.Context, name string) error {
// restart the app
if err := app.Restart(); err != nil {
if err := revertDataDirChanges(); err != nil {
panic(err)
if revertErr := revertDataDirChanges(); revertErr != nil {
panic(revertErr)
}
return fmt.Errorf("failed to restart the app process: %w", err)
@@ -251,9 +258,12 @@ func (app *BaseApp) initAutobackupHooks() error {
name := app.generateBackupName(autoPrefix)
if err := app.CreateBackup(context.Background(), name); err != nil && app.IsDebug() {
// @todo replace after logs generalization
log.Println(err)
if err := app.CreateBackup(context.Background(), name); err != nil {
app.Logger().Debug(
"[Backup cron] Failed to create backup",
slog.String("name", name),
slog.String("error", err.Error()),
)
}
maxKeep := app.Settings().Backups.CronMaxKeep
@@ -263,17 +273,21 @@ func (app *BaseApp) initAutobackupHooks() error {
}
fsys, err := app.NewBackupsFilesystem()
if err != nil && app.IsDebug() {
// @todo replace after logs generalization
log.Println(err)
if err != nil {
app.Logger().Debug(
"[Backup cron] Failed to initialize the backup filesystem",
slog.String("error", err.Error()),
)
return
}
defer fsys.Close()
files, err := fsys.List(autoPrefix)
if err != nil && app.IsDebug() {
// @todo replace after logs generalization
log.Println(err)
if err != nil {
app.Logger().Debug(
"[Backup cron] Failed to list autogenerated backups",
slog.String("error", err.Error()),
)
return
}
@@ -290,9 +304,12 @@ func (app *BaseApp) initAutobackupHooks() error {
toRemove := files[maxKeep:]
for _, f := range toRemove {
if err := fsys.Delete(f.Key); err != nil && app.IsDebug() {
// @todo replace after logs generalization
log.Println(err)
if err := fsys.Delete(f.Key); err != nil {
app.Logger().Debug(
"[Backup cron] Failed to remove old autogenerated backup",
slog.String("key", f.Key),
slog.String("error", err.Error()),
)
}
}
})
+4 -4
View File
@@ -25,11 +25,11 @@ func TestCreateBackup(t *testing.T) {
expectedAppNamePrefix := "test_" + strings.Repeat("a", 45)
// test pending error
app.Cache().Set(core.CacheKeyActiveBackup, "")
app.Store().Set(core.StoreKeyActiveBackup, "")
if err := app.CreateBackup(context.Background(), "test.zip"); err == nil {
t.Fatal("Expected pending error, got nil")
}
app.Cache().Remove(core.CacheKeyActiveBackup)
app.Store().Remove(core.StoreKeyActiveBackup)
// create with auto generated name
if err := app.CreateBackup(context.Background(), ""); err != nil {
@@ -98,11 +98,11 @@ func TestRestoreBackup(t *testing.T) {
}
// test pending error
app.Cache().Set(core.CacheKeyActiveBackup, "")
app.Store().Set(core.StoreKeyActiveBackup, "")
if err := app.RestoreBackup(context.Background(), "test"); err == nil {
t.Fatal("Expected pending error, got nil")
}
app.Cache().Remove(core.CacheKeyActiveBackup)
app.Store().Remove(core.StoreKeyActiveBackup)
// missing backup
if err := app.RestoreBackup(context.Background(), "missing"); err == nil {
+85 -19
View File
@@ -3,8 +3,12 @@ package core
import (
"os"
"testing"
"time"
"github.com/pocketbase/pocketbase/migrations/logs"
"github.com/pocketbase/pocketbase/tools/logger"
"github.com/pocketbase/pocketbase/tools/mailer"
"github.com/pocketbase/pocketbase/tools/migrate"
)
func TestNewBaseApp(t *testing.T) {
@@ -14,7 +18,6 @@ func TestNewBaseApp(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "test_env",
IsDebug: true,
})
if app.dataDir != testDataDir {
@@ -25,12 +28,8 @@ func TestNewBaseApp(t *testing.T) {
t.Fatalf("expected encryptionEnv test_env, got %q", app.dataDir)
}
if !app.isDebug {
t.Fatalf("expected isDebug true, got %v", app.isDebug)
}
if app.cache == nil {
t.Fatal("expected cache to be set, got nil")
if app.store == nil {
t.Fatal("expected store to be set, got nil")
}
if app.settings == nil {
@@ -49,7 +48,6 @@ func TestBaseAppBootstrap(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "pb_test_env",
IsDebug: false,
})
defer app.ResetBootstrapState()
@@ -57,7 +55,6 @@ func TestBaseAppBootstrap(t *testing.T) {
t.Fatal("Didn't expect the application to be bootstrapped.")
}
// bootstrap
if err := app.Bootstrap(); err != nil {
t.Fatal(err)
}
@@ -106,6 +103,14 @@ func TestBaseAppBootstrap(t *testing.T) {
t.Fatal("Expected app.settings to be initialized, got nil.")
}
if app.logger == nil {
t.Fatal("Expected app.logger to be initialized, got nil.")
}
if _, ok := app.logger.Handler().(*logger.BatchHandler); !ok {
t.Fatal("Expected app.logger handler to be initialized.")
}
// reset
if err := app.ResetBootstrapState(); err != nil {
t.Fatal(err)
@@ -127,7 +132,6 @@ func TestBaseAppGetters(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "pb_test_env",
IsDebug: false,
})
defer app.ResetBootstrapState()
@@ -159,16 +163,16 @@ func TestBaseAppGetters(t *testing.T) {
t.Fatalf("Expected app.EncryptionEnv %v, got %v", app.EncryptionEnv(), app.encryptionEnv)
}
if app.isDebug != app.IsDebug() {
t.Fatalf("Expected app.IsDebug %v, got %v", app.IsDebug(), app.isDebug)
}
if app.settings != app.Settings() {
t.Fatalf("Expected app.Settings %v, got %v", app.Settings(), app.settings)
}
if app.cache != app.Cache() {
t.Fatalf("Expected app.Cache %v, got %v", app.Cache(), app.cache)
if app.store != app.Store() {
t.Fatalf("Expected app.Store %v, got %v", app.Store(), app.store)
}
if app.logger != app.Logger() {
t.Fatalf("Expected app.Logger %v, got %v", app.Logger(), app.logger)
}
if app.subscriptionsBroker != app.SubscriptionsBroker() {
@@ -187,7 +191,6 @@ func TestBaseAppNewMailClient(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "pb_test_env",
IsDebug: false,
})
client1 := app.NewMailClient()
@@ -210,7 +213,6 @@ func TestBaseAppNewFilesystem(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "pb_test_env",
IsDebug: false,
})
// local
@@ -240,7 +242,6 @@ func TestBaseAppNewBackupsFilesystem(t *testing.T) {
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
EncryptionEnv: "pb_test_env",
IsDebug: false,
})
// local
@@ -262,3 +263,68 @@ func TestBaseAppNewBackupsFilesystem(t *testing.T) {
t.Fatalf("Expected nil s3 backups filesystem, got %v", s3)
}
}
func TestBaseAppLoggerWrites(t *testing.T) {
testDataDir, err := os.MkdirTemp("", "logger_writes")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testDataDir)
app := NewBaseApp(BaseAppConfig{
DataDir: testDataDir,
})
if err := app.Bootstrap(); err != nil {
t.Fatal(err)
}
// init logs migrations
runner, err := migrate.NewRunner(app.LogsDB(), logs.LogsMigrations)
if err != nil {
t.Fatalf("Logs runner error: %v", err)
}
if _, err := runner.Up(); err != nil {
t.Fatalf("Logs migration execution error: %v", err)
}
// test batch logs writes
{
threshold := 200
for i := 0; i < threshold-1; i++ {
app.Logger().Error("test")
}
if total := totalLogs(app, t); total != 0 {
t.Fatalf("Expected %d logs, got %d", 0, total)
}
// should trigger batch write
app.Logger().Error("test")
// should be added for the next batch write
app.Logger().Error("test")
if total := totalLogs(app, t); total != threshold {
t.Fatalf("Expected %d logs, got %d", threshold, total)
}
// wait for ~3 secs to check the timer trigger
time.Sleep(3200 * time.Millisecond)
if total := totalLogs(app, t); total != threshold+1 {
t.Fatalf("Expected %d logs, got %d", threshold+1, total)
}
}
}
func totalLogs(app App, t *testing.T) int {
var total int
err := app.LogsDao().LogQuery().Select("count(*)").Row(&total)
if err != nil {
t.Fatalf("Failed to fetch total logs: %v", err)
}
return total
}