synced with master
This commit is contained in:
+23
-6
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/pocketbase/pocketbase/forms"
|
||||
"github.com/pocketbase/pocketbase/models"
|
||||
"github.com/pocketbase/pocketbase/resolvers"
|
||||
"github.com/pocketbase/pocketbase/tools/routine"
|
||||
"github.com/pocketbase/pocketbase/tools/search"
|
||||
"github.com/pocketbase/pocketbase/tools/subscriptions"
|
||||
)
|
||||
@@ -43,10 +44,14 @@ func (api *realtimeApi) connect(c echo.Context) error {
|
||||
client := subscriptions.NewDefaultClient()
|
||||
api.app.SubscriptionsBroker().Register(client)
|
||||
defer func() {
|
||||
api.app.OnRealtimeDisconnectRequest().Trigger(&core.RealtimeDisconnectEvent{
|
||||
disconnectEvent := &core.RealtimeDisconnectEvent{
|
||||
HttpContext: c,
|
||||
Client: client,
|
||||
})
|
||||
}
|
||||
|
||||
if err := api.app.OnRealtimeDisconnectRequest().Trigger(disconnectEvent); err != nil && api.app.IsDebug() {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
api.app.SubscriptionsBroker().Unregister(client.Id())
|
||||
}()
|
||||
@@ -259,21 +264,27 @@ func (api *realtimeApi) bindEvents() {
|
||||
|
||||
api.app.OnModelAfterCreate().PreAdd(func(e *core.ModelEvent) error {
|
||||
if record, ok := e.Model.(*models.Record); ok {
|
||||
api.broadcastRecord("create", record)
|
||||
if err := api.broadcastRecord("create", record); err != nil && api.app.IsDebug() {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
api.app.OnModelAfterUpdate().PreAdd(func(e *core.ModelEvent) error {
|
||||
if record, ok := e.Model.(*models.Record); ok {
|
||||
api.broadcastRecord("update", record)
|
||||
if err := api.broadcastRecord("update", record); err != nil && api.app.IsDebug() {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
api.app.OnModelBeforeDelete().Add(func(e *core.ModelEvent) error {
|
||||
if record, ok := e.Model.(*models.Record); ok {
|
||||
api.broadcastRecord("delete", record)
|
||||
if err := api.broadcastRecord("delete", record); err != nil && api.app.IsDebug() {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -370,6 +381,8 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
encodedData := string(dataBytes)
|
||||
|
||||
for _, client := range clients {
|
||||
client := client
|
||||
|
||||
for subscription, rule := range subscriptionRuleMap {
|
||||
if !client.HasSubscription(subscription) {
|
||||
continue
|
||||
@@ -398,7 +411,11 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
|
||||
}
|
||||
}
|
||||
|
||||
client.Channel() <- msg
|
||||
routine.FireAndForget(func() {
|
||||
if !client.IsDiscarded() {
|
||||
client.Channel() <- msg
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user