Realtime API - Python SDK
BosBase exposes realtime change feeds over Server-Sent Events (SSE). The Python SDK manages the SSE connection in a background thread and automatically resubscribes after reconnects.
Basic Usage
def on_post_update(event):
print(event["action"], event["record"]["title"])
unsubscribe = pb.collection("posts").subscribe("*", on_post_update)
# Later
unsubscribe()
- Topic
"*"listens to all records. - Use a record ID to watch a single record:
subscribe("RECORD_ID", callback).
Subscription Options
Each subscription can include query params or headers (used by API rules):
unsubscribe = pb.collection("posts").subscribe(
"*",
on_post_update,
query={
"filter": pb.filter("status = {:status}", {"status": "published"}),
"expand": "author",
},
headers={"X-App-Instance": "cli"},
)
PB_CONNECT Event
The SDK automatically listens for the PB_CONNECT event and stores the server-assigned clientId. If you want to react to reconnects, set pb.realtime.on_disconnect.
def handle_disconnect(active_topics):
if active_topics:
print("Connection lost, waiting for auto-reconnect…")
else:
print("No active subscriptions; connection closed.")
pb.realtime.on_disconnect = handle_disconnect
Custom Realtime Topics
pb.realtime can subscribe to raw topics for custom realtime events emitted by server hooks.
def on_job(event):
print(event)
pb.realtime.subscribe("jobs/finished", on_job)
Unsubscribing
unsubscribe(topic)removes every listener for that topic.unsubscribe_by_prefix("posts/")removes all collection listeners.- The returned callable from
subscribe()removes the specific listener only.
pb.collection("posts").unsubscribe("RECORD_ID")
pb.realtime.unsubscribe() # remove every topic
Threading Notes
- Callbacks execute on the SSE thread. If you mutate shared state, protect it with locks.
- The SSE loop automatically restarts unless all subscriptions are removed.
Tips
- Always expand required relations in the subscription query; event payloads do not re-fetch data.
- Use
filterquery params to reduce server load and network traffic. - Combine realtime events with local caches to keep UI lists in sync.
- When using OAuth2 auth flows, the SDK internally subscribes to the
@oauth2topic and cleans it up automatically—no manual action required.
Complete Examples
Example 1: Real-time Chat
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def setup_chat_room(room_id: str):
"""Subscribe to messages in a chat room."""
def on_message(event):
# Filter for this room only
if event["record"].get("roomId") == room_id:
if event["action"] == "create":
display_message(event["record"])
elif event["action"] == "delete":
remove_message(event["record"]["id"])
unsubscribe = pb.collection("messages").subscribe(
"*",
on_message,
query={"filter": f'roomId = "{room_id}"'}
)
return unsubscribe
def display_message(record):
"""Display a message in the UI."""
print(f"New message: {record.get('content')}")
def remove_message(message_id: str):
"""Remove a message from the UI."""
print(f"Message deleted: {message_id}")
# Usage
unsubscribe_chat = setup_chat_room("ROOM_ID")
# Cleanup
# unsubscribe_chat()
Example 2: Real-time Dashboard
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def setup_dashboard():
"""Subscribe to multiple collections for dashboard updates."""
# Posts updates
def on_post(event):
if event["action"] == "create":
add_post_to_feed(event["record"])
elif event["action"] == "update":
update_post_in_feed(event["record"])
pb.collection("posts").subscribe(
"*",
on_post,
query={
"filter": 'status = "published"',
"expand": "author"
}
)
# Comments updates
def on_comment(event):
update_comments_count(event["record"].get("postId"))
pb.collection("comments").subscribe(
"*",
on_comment,
query={"expand": "user"}
)
def add_post_to_feed(record):
"""Add a new post to the feed."""
print(f"New post: {record.get('title')}")
def update_post_in_feed(record):
"""Update a post in the feed."""
print(f"Post updated: {record.get('title')}")
def update_comments_count(post_id: str):
"""Update comment count for a post."""
print(f"Comments updated for post: {post_id}")
# Usage
setup_dashboard()
Example 3: User Activity Tracking
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def track_user_activity(user_id: str):
"""Track changes to a user's own records."""
def on_user_post(event):
# Only track changes to user's own posts
if event["record"].get("author") == user_id:
print(f"Your post {event['action']}: {event['record'].get('title')}")
if event["action"] == "update":
show_notification("Post updated")
pb.collection("posts").subscribe(
"*",
on_user_post,
query={"filter": f'author = "{user_id}"'}
)
def show_notification(message: str):
"""Show a notification to the user."""
print(f"Notification: {message}")
# Usage
pb.collection("users").auth_with_password("user@example.com", "password")
user_id = pb.auth_store.record["id"]
track_user_activity(user_id)
Example 4: Real-time Collaboration
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def track_document_edits(document_id: str):
"""Track when a document is being edited."""
def on_document_update(event):
if event["action"] == "update":
last_editor = event["record"].get("lastEditor")
updated_at = event["record"].get("updated")
# Show who last edited the document
show_editor_indicator(last_editor, updated_at)
pb.collection("documents").subscribe(
document_id,
on_document_update,
query={"expand": "lastEditor"}
)
def show_editor_indicator(editor_id: str, updated_at: str):
"""Show who last edited the document."""
print(f"Document edited by {editor_id} at {updated_at}")
# Usage
track_document_edits("DOCUMENT_ID")
Example 5: Connection Monitoring
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def handle_disconnect(active_topics):
"""Handle disconnection events."""
if active_topics:
print("Connection lost, attempting to reconnect...")
show_connection_status("Reconnecting...")
else:
print("No active subscriptions; connection closed.")
# Monitor connection state
pb.realtime.on_disconnect = handle_disconnect
def on_connect(event):
"""Handle connection establishment."""
print(f"Connected to realtime: {event.get('clientId')}")
show_connection_status("Connected")
# Monitor connection establishment
pb.realtime.subscribe("PB_CONNECT", on_connect)
def show_connection_status(status: str):
"""Update connection status in UI."""
print(f"Connection status: {status}")
Example 6: Conditional Subscriptions
from bosbase import BosBase
pb = BosBase("http://127.0.0.1:8090")
def handler(event):
"""Handle post events."""
print(f"Post {event['action']}: {event['record'].get('title')}")
def setup_conditional_subscriptions():
"""Subscribe conditionally based on user state."""
if pb.auth_store.is_valid():
# Authenticated user - subscribe to private posts
pb.collection("posts").subscribe(
"*",
handler,
query={"filter": '@request.auth.id != ""'}
)
else:
# Guest user - subscribe only to public posts
pb.collection("posts").subscribe(
"*",
handler,
query={"filter": 'public = true'}
)
# Usage
setup_conditional_subscriptions()
Example 7: Cleanup on Application Exit
from bosbase import BosBase
import atexit
pb = BosBase("http://127.0.0.1:8090")
class RealtimeSubscription:
"""Manage realtime subscriptions with cleanup."""
def __init__(self):
self.subscriptions = []
atexit.register(self.cleanup)
def subscribe(self, collection_name: str, topic: str, handler):
"""Subscribe to a topic and track the unsubscribe function."""
unsubscribe = pb.collection(collection_name).subscribe(topic, handler)
self.subscriptions.append(unsubscribe)
return unsubscribe
def cleanup(self):
"""Unsubscribe from all subscriptions."""
for unsubscribe in self.subscriptions:
try:
unsubscribe()
except Exception as e:
print(f"Error unsubscribing: {e}")
self.subscriptions.clear()
# Usage
rt = RealtimeSubscription()
def on_post(event):
print(f"Post changed: {event}")
rt.subscribe("posts", "*", on_post)
# Cleanup happens automatically on exit
# Or manually: rt.cleanup()
Error Handling
from bosbase import BosBase
from bosbase.exceptions import ClientResponseError
pb = BosBase("http://127.0.0.1:8090")
def handler(event):
print(f"Event: {event['action']}")
try:
pb.collection("posts").subscribe("*", handler)
except ClientResponseError as error:
if error.status == 403:
print("Permission denied")
elif error.status == 404:
print("Collection not found")
else:
print(f"Subscription error: {error}")
except Exception as error:
print(f"Unexpected error: {error}")