Realtime API - Go SDK Documentation
Overview
The Realtime API enables real-time updates for collection records using Server-Sent Events (SSE). It allows you to subscribe to changes in collections or specific records and receive instant notifications when records are created, updated, or deleted.
Key Features:
- Real-time notifications for record changes
- Collection-level and record-level subscriptions
- Automatic connection management and reconnection
- Authorization support
- Subscription options (expand, custom headers, query params)
- Event-driven architecture
Backend Endpoints:
GET /api/realtime- Establish SSE connectionPOST /api/realtime- Set subscriptions
How It Works
- Connection: The SDK establishes an SSE connection to
/api/realtime - Client ID: Server sends
PB_CONNECTevent with a uniqueclientId - Subscriptions: Client submits subscription topics via POST request
- Events: Server sends events when matching records change
- Reconnection: SDK automatically reconnects on connection loss
Basic Usage
Subscribe to Collection Changes
Subscribe to all changes in a collection:
package main
import (
"fmt"
"log"
bosbase "github.com/bosbase/go-sdk"
)
func main() {
client := bosbase.New("http://127.0.0.1:8090")
defer client.Close()
// Subscribe to all changes in the 'posts' collection
unsubscribe, err := client.Collection("posts").Subscribe("*", func(e map[string]interface{}) {
action, _ := e["action"].(string)
record, _ := e["record"].(map[string]interface{})
fmt.Printf("Action: %s\n", action)
fmt.Printf("Record: %v\n", record)
}, nil, nil)
if err != nil {
log.Fatal(err)
}
// Later, unsubscribe
defer unsubscribe()
}
Subscribe to Specific Record
Subscribe to changes for a single record:
// Subscribe to changes for a specific post
unsubscribe, err := client.Collection("posts").Subscribe("RECORD_ID", func(e map[string]interface{}) {
record, _ := e["record"].(map[string]interface{})
action, _ := e["action"].(string)
fmt.Printf("Record changed: %v\n", record)
fmt.Printf("Action: %s\n", action)
}, nil, nil)
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
Multiple Subscriptions
You can subscribe multiple times to the same or different topics:
// Subscribe to multiple records
unsubscribe1, _ := client.Collection("posts").Subscribe("RECORD_ID_1", handleChange, nil, nil)
unsubscribe2, _ := client.Collection("posts").Subscribe("RECORD_ID_2", handleChange, nil, nil)
unsubscribe3, _ := client.Collection("posts").Subscribe("*", handleAllChanges, nil, nil)
func handleChange(e map[string]interface{}) {
fmt.Println("Change event:", e)
}
func handleAllChanges(e map[string]interface{}) {
fmt.Println("Collection-wide change:", e)
}
// Unsubscribe individually
defer unsubscribe1()
defer unsubscribe2()
defer unsubscribe3()
Event Structure
Each event received contains:
{
"action": "create" | "update" | "delete", // Action type
"record": { // Record data
"id": "RECORD_ID",
"collectionId": "COLLECTION_ID",
"collectionName": "collection_name",
"created": "2023-01-01 00:00:00.000Z",
"updated": "2023-01-01 00:00:00.000Z",
// ... other fields
}
}
PB_CONNECT Event
When the connection is established, you receive a PB_CONNECT event:
unsubscribe, err := client.Realtime.Subscribe("PB_CONNECT", func(e map[string]interface{}) {
clientID, _ := e["clientId"].(string)
fmt.Printf("Connected! Client ID: %s\n", clientID)
}, nil, nil)
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
Subscription Topics
Collection-Level Subscription
Subscribe to all changes in a collection:
// Wildcard subscription - all records in collection
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
Access Control: Uses the collection’s ListRule to determine if the subscriber has access to receive events.
Record-Level Subscription
Subscribe to changes for a specific record:
// Specific record subscription
unsubscribe, err := client.Collection("posts").Subscribe("RECORD_ID", handler, nil, nil)
Access Control: Uses the collection’s ViewRule to determine if the subscriber has access to receive events.
Subscription Options
You can pass additional options when subscribing:
query := map[string]interface{}{
"filter": "status = \"published\"",
"expand": "author",
}
headers := map[string]string{
"X-Custom-Header": "value",
}
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, query, headers)
Expand Relations
Expand relations in the event data:
query := map[string]interface{}{
"expand": "author,categories",
}
unsubscribe, err := client.Collection("posts").Subscribe("RECORD_ID", func(e map[string]interface{}) {
record, _ := e["record"].(map[string]interface{})
expand, _ := record["expand"].(map[string]interface{})
author, _ := expand["author"].(map[string]interface{})
fmt.Println("Author relation expanded:", author)
}, query, nil)
Filter with Query Parameters
Use query parameters for API rule filtering:
query := map[string]interface{}{
"filter": "status = \"published\"",
}
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, query, nil)
Unsubscribing
Unsubscribe from Specific Topic
// Remove all subscriptions for a specific record
client.Collection("posts").Unsubscribe("RECORD_ID")
// Remove all wildcard subscriptions for the collection
client.Collection("posts").Unsubscribe("*")
Unsubscribe from All
// Unsubscribe from all subscriptions in the collection
client.Collection("posts").Unsubscribe("")
// Or unsubscribe from everything
client.Realtime.Unsubscribe("")
Unsubscribe Using Returned Function
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
if err != nil {
log.Fatal(err)
}
// Later...
unsubscribe() // Removes this specific subscription
Connection Management
Connection Status
Check if the realtime connection is established:
// Note: The Go SDK manages connection state internally
// Connection is established automatically when subscribing
Disconnect Handler
Handle disconnection events:
client.Realtime.OnDisconnect = func(activeSubscriptions []string) {
if len(activeSubscriptions) > 0 {
fmt.Printf("Connection lost, but subscriptions remain: %v\n", activeSubscriptions)
// Connection will automatically reconnect
} else {
fmt.Println("Intentionally disconnected (no active subscriptions)")
}
}
Automatic Reconnection
The SDK automatically:
- Reconnects when the connection is lost
- Resubmits all active subscriptions
- Handles network interruptions gracefully
- Closes connection after 5 minutes of inactivity (server-side timeout)
Authorization
Authenticated Subscriptions
Subscriptions respect authentication. If you’re authenticated, events are filtered based on your permissions:
// Authenticate first
_, err := client.Collection("users").AuthWithPassword(
"user@example.com", "password123", "", "", nil, nil, nil)
if err != nil {
log.Fatal(err)
}
// Now subscribe - events will respect your permissions
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
Authorization Rules
- Collection-level (
*): UsesListRuleto determine access - Record-level: Uses
ViewRuleto determine access - Superusers: Can receive all events (if rules allow)
- Guests: Only receive events they have permission to see
Auth State Changes
When authentication state changes, you may need to resubscribe:
// After login/logout, resubscribe to update permissions
_, err := client.Collection("users").AuthWithPassword(
"user@example.com", "password123", "", "", nil, nil, nil)
if err != nil {
log.Fatal(err)
}
// Re-subscribe to update auth state in realtime connection
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
Advanced Examples
Example 1: Real-time Chat
// Subscribe to messages in a chat room
func setupChatRoom(client *bosbase.BosBase, roomID string) (func(), error) {
query := map[string]interface{}{
"filter": fmt.Sprintf(`roomId = "%s"`, roomID),
}
unsubscribe, err := client.Collection("messages").Subscribe("*", func(e map[string]interface{}) {
action, _ := e["action"].(string)
record, _ := e["record"].(map[string]interface{})
// Filter for this room only
recordRoomID, _ := record["roomId"].(string)
if recordRoomID == roomID {
if action == "create" {
displayMessage(record)
} else if action == "delete" {
removeMessage(record["id"].(string))
}
}
}, query, nil)
return unsubscribe, err
}
// Usage
unsubscribeChat, err := setupChatRoom(client, "ROOM_ID")
if err != nil {
log.Fatal(err)
}
defer unsubscribeChat()
Example 2: Real-time Dashboard
// Subscribe to multiple collections
func setupDashboard(client *bosbase.BosBase) error {
// Posts updates
query1 := map[string]interface{}{
"filter": "status = \"published\"",
"expand": "author",
}
_, err := client.Collection("posts").Subscribe("*", func(e map[string]interface{}) {
action, _ := e["action"].(string)
record, _ := e["record"].(map[string]interface{})
if action == "create" {
addPostToFeed(record)
} else if action == "update" {
updatePostInFeed(record)
}
}, query1, nil)
if err != nil {
return err
}
// Comments updates
query2 := map[string]interface{}{
"expand": "user",
}
_, err = client.Collection("comments").Subscribe("*", func(e map[string]interface{}) {
record, _ := e["record"].(map[string]interface{})
postID, _ := record["postId"].(string)
updateCommentsCount(postID)
}, query2, nil)
return err
}
Example 3: User Activity Tracking
// Track changes to a user's own records
func trackUserActivity(client *bosbase.BosBase, userID string) error {
query := map[string]interface{}{
"filter": fmt.Sprintf(`author = "%s"`, userID),
}
_, err := client.Collection("posts").Subscribe("*", func(e map[string]interface{}) {
action, _ := e["action"].(string)
record, _ := e["record"].(map[string]interface{})
title, _ := record["title"].(string)
fmt.Printf("Your post %s: %s\n", action, title)
if action == "update" {
showNotification("Post updated")
}
}, query, nil)
return err
}
// Usage
authRecord := client.AuthStore.Record()
userID, _ := authRecord["id"].(string)
trackUserActivity(client, userID)
Example 4: Connection Monitoring
// Monitor connection state
client.Realtime.OnDisconnect = func(activeSubscriptions []string) {
if len(activeSubscriptions) > 0 {
fmt.Println("Connection lost, attempting to reconnect...")
showConnectionStatus("Reconnecting...")
}
}
// Monitor connection establishment
_, err := client.Realtime.Subscribe("PB_CONNECT", func(e map[string]interface{}) {
clientID, _ := e["clientId"].(string)
fmt.Printf("Connected to realtime: %s\n", clientID)
showConnectionStatus("Connected")
}, nil, nil)
Error Handling
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
if err != nil {
if clientErr, ok := err.(*bosbase.ClientResponseError); ok {
if clientErr.Status == 403 {
fmt.Println("Permission denied")
} else if clientErr.Status == 404 {
fmt.Println("Collection not found")
} else {
fmt.Printf("Subscription error: %v\n", err)
}
} else {
fmt.Printf("Subscription error: %v\n", err)
}
return
}
defer unsubscribe()
Best Practices
- Unsubscribe When Done: Always unsubscribe when subscriptions are no longer needed
- Handle Disconnections: Implement
OnDisconnecthandler for better UX - Filter Server-Side: Use query parameters to filter events server-side when possible
- Limit Subscriptions: Don’t subscribe to more collections than necessary
- Use Record-Level When Possible: Prefer record-level subscriptions over collection-level when you only need specific records
- Monitor Connection: Track connection state for debugging and user feedback
- Handle Errors: Wrap subscriptions in error handling
- Respect Permissions: Understand that events respect API rules and permissions
Limitations
- Maximum Subscriptions: Up to 1000 subscriptions per client
- Topic Length: Maximum 2500 characters per topic
- Idle Timeout: Connection closes after 5 minutes of inactivity
- Network Dependency: Requires stable network connection
Troubleshooting
Connection Not Establishing
// Connection is established automatically when subscribing
// If connection fails, check network connectivity and server status
Events Not Received
- Check API rules - you may not have permission
- Verify subscription is active
- Check network connectivity
- Review server logs for errors
Memory Leaks
Always unsubscribe:
// Good
unsubscribe, err := client.Collection("posts").Subscribe("*", handler, nil, nil)
if err != nil {
log.Fatal(err)
}
// ... later
unsubscribe()
// Bad - no cleanup
_, _ = client.Collection("posts").Subscribe("*", handler, nil, nil)
// Never unsubscribed - memory leak!
Related Documentation
- API Records - CRUD operations
- Collections - Collection configuration
- API Rules and Filters - Understanding API rules