Realtime API - Rust SDK Documentation
Overview
The Realtime API enables real-time updates for collection records using Server-Sent Events (SSE). It allows you to subscribe to changes in collections or specific records and receive instant notifications when records are created, updated, or deleted.
Key Features:
- Real-time notifications for record changes
- Collection-level and record-level subscriptions
- Automatic connection management and reconnection
- Authorization support
- Subscription options (expand, custom headers, query params)
- Event-driven architecture
Backend Endpoints:
GET /api/realtime- Establish SSE connectionPOST /api/realtime- Set subscriptions
How It Works
- Connection: The SDK establishes an SSE connection to
/api/realtime - Client ID: Server sends
PB_CONNECTevent with a uniqueclientId - Subscriptions: Client submits subscription topics via POST request
- Events: Server sends events when matching records change
- Reconnection: SDK automatically reconnects on connection loss
Basic Usage
Subscribe to Collection Changes
Subscribe to all changes in a collection:
use bosbase::BosBase;
use serde_json::Value;
use std::collections::HashMap;
let pb = BosBase::new("http://127.0.0.1:8090");
// Subscribe to all changes in the 'posts' collection
let unsubscribe = pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Action: {}", e["action"]);
println!("Record: {:?}", e["record"]);
},
HashMap::new(),
HashMap::new()
)?;
// Later, unsubscribe
unsubscribe();
Subscribe to Specific Record
Subscribe to changes for a single record:
// Subscribe to changes for a specific post
pb.collection("posts").subscribe(
"RECORD_ID",
|e: Value| {
println!("Record changed: {:?}", e["record"]);
println!("Action: {}", e["action"]);
},
HashMap::new(),
HashMap::new()
)?;
Multiple Subscriptions
You can subscribe multiple times to the same or different topics:
fn handle_change(e: Value) {
println!("Change event: {:?}", e);
}
fn handle_all_changes(e: Value) {
println!("Collection-wide change: {:?}", e);
}
// Subscribe to multiple records
let unsubscribe1 = pb.collection("posts").subscribe(
"RECORD_ID_1",
handle_change,
HashMap::new(),
HashMap::new()
)?;
let unsubscribe2 = pb.collection("posts").subscribe(
"RECORD_ID_2",
handle_change,
HashMap::new(),
HashMap::new()
)?;
let unsubscribe3 = pb.collection("posts").subscribe(
"*",
handle_all_changes,
HashMap::new(),
HashMap::new()
)?;
// Unsubscribe individually
unsubscribe1();
unsubscribe2();
unsubscribe3();
Event Structure
Each event received contains:
// Event structure:
// {
// "action": "create" | "update" | "delete", // Action type
// "record": { // Record data
// "id": "RECORD_ID",
// "collectionId": "COLLECTION_ID",
// "collectionName": "collection_name",
// "created": "2023-01-01 00:00:00.000Z",
// "updated": "2023-01-01 00:00:00.000Z",
// // ... other fields
// }
// }
PB_CONNECT Event
When the connection is established, you receive a PB_CONNECT event:
use serde_json::json;
pb.realtime.subscribe(
"PB_CONNECT",
|e: Value| {
println!("Connected! Client ID: {}", e["clientId"]);
// e["clientId"] - unique client identifier
},
HashMap::new(),
HashMap::new()
)?;
Subscription Topics
Collection-Level Subscription
Subscribe to all changes in a collection:
// Wildcard subscription - all records in collection
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
Access Control: Uses the collection’s ListRule to determine if the subscriber has access to receive events.
Record-Level Subscription
Subscribe to changes for a specific record:
// Specific record subscription
pb.collection("posts").subscribe(
"RECORD_ID",
|e: Value| {
println!("Record changed: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
Access Control: Uses the collection’s ViewRule to determine if the subscriber has access to receive events.
Subscription Options
You can pass additional options when subscribing:
use serde_json::json;
let mut query = HashMap::new();
query.insert("filter".to_string(), json!(r#"status = "published""#));
query.insert("expand".to_string(), json!("author"));
let mut headers = HashMap::new();
headers.insert("X-Custom-Header".to_string(), "value".to_string());
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
query,
headers
)?;
Expand Relations
Expand relations in the event data:
let mut query = HashMap::new();
query.insert("expand".to_string(), json!("author,categories"));
pb.collection("posts").subscribe(
"RECORD_ID",
|e: Value| {
if let Some(record) = e.get("record") {
if let Some(expand) = record.get("expand") {
if let Some(author) = expand.get("author") {
println!("Author: {}", author["name"]);
}
}
}
},
query,
HashMap::new()
)?;
Filter with Query Parameters
Use query parameters for API rule filtering:
let mut query = HashMap::new();
query.insert("filter".to_string(), json!(r#"status = "published""#));
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Published post changed: {:?}", e);
},
query,
HashMap::new()
)?;
Unsubscribing
Unsubscribe from Specific Topic
// Remove all subscriptions for a specific record
pb.collection("posts").unsubscribe(Some("RECORD_ID".to_string()));
// Remove all wildcard subscriptions for the collection
pb.collection("posts").unsubscribe(Some("*".to_string()));
Unsubscribe from All
// Unsubscribe from all subscriptions in the collection
pb.collection("posts").unsubscribe(None);
// Or unsubscribe from everything
pb.realtime.unsubscribe();
Unsubscribe Using Returned Function
let unsubscribe = pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
// Later...
unsubscribe(); // Removes this specific subscription
Connection Management
Disconnect Handler
Handle disconnection events:
// Note: The Rust SDK handles reconnection automatically
// You can check connection status through the realtime service
Automatic Reconnection
The SDK automatically:
- Reconnects when the connection is lost
- Resubmits all active subscriptions
- Handles network interruptions gracefully
- Closes connection after 5 minutes of inactivity (server-side timeout)
Authorization
Authenticated Subscriptions
Subscriptions respect authentication. If you’re authenticated, events are filtered based on your permissions:
// Authenticate first
pb.collection("users").auth_with_password(
"user@example.com",
"password",
HashMap::new(),
HashMap::new(),
None
).await?;
// Now subscribe - events will respect your permissions
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
Authorization Rules
- Collection-level (
*): UsesListRuleto determine access - Record-level: Uses
ViewRuleto determine access - Superusers: Can receive all events (if rules allow)
- Guests: Only receive events they have permission to see
Auth State Changes
When authentication state changes, you may need to resubscribe:
// After login/logout, resubscribe to update permissions
pb.collection("users").auth_with_password(
"user@example.com",
"password",
HashMap::new(),
HashMap::new(),
None
).await?;
// Re-subscribe to update auth state in realtime connection
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
Advanced Examples
Example 1: Real-time Chat
async fn setup_chat_room(
pb: &BosBase,
room_id: &str,
) -> Result<impl FnOnce(), Box<dyn std::error::Error>> {
let mut query = HashMap::new();
query.insert("filter".to_string(), json!(format!(r#"roomId = "{}""#, room_id)));
let unsubscribe = pb.collection("messages").subscribe(
"*",
move |e: Value| {
// Filter for this room only
if let Some(record) = e.get("record") {
if record.get("roomId").and_then(|r| r.as_str()) == Some(room_id) {
match e["action"].as_str() {
Some("create") => {
println!("New message: {:?}", record);
}
Some("delete") => {
println!("Message deleted: {}", record["id"]);
}
_ => {}
}
}
}
},
query,
HashMap::new()
)?;
Ok(unsubscribe)
}
// Usage
let unsubscribe = setup_chat_room(&pb, "ROOM_ID").await?;
// ... later
unsubscribe();
Example 2: Real-time Dashboard
async fn setup_dashboard(pb: &BosBase) -> Result<(), Box<dyn std::error::Error>> {
// Posts updates
let mut posts_query = HashMap::new();
posts_query.insert("filter".to_string(), json!(r#"status = "published""#));
posts_query.insert("expand".to_string(), json!("author"));
pb.collection("posts").subscribe(
"*",
|e: Value| {
match e["action"].as_str() {
Some("create") => {
println!("New post: {:?}", e["record"]);
}
Some("update") => {
println!("Post updated: {:?}", e["record"]);
}
_ => {}
}
},
posts_query,
HashMap::new()
)?;
// Comments updates
let mut comments_query = HashMap::new();
comments_query.insert("expand".to_string(), json!("user"));
pb.collection("comments").subscribe(
"*",
|e: Value| {
if let Some(record) = e.get("record") {
if let Some(post_id) = record.get("postId") {
println!("Comment on post {}: {:?}", post_id, record);
}
}
},
comments_query,
HashMap::new()
)?;
Ok(())
}
Example 3: User Activity Tracking
async fn track_user_activity(
pb: &BosBase,
user_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut query = HashMap::new();
query.insert("filter".to_string(), json!(format!(r#"author = "{}""#, user_id)));
pb.collection("posts").subscribe(
"*",
move |e: Value| {
// Only track changes to user's own posts
if let Some(record) = e.get("record") {
if record.get("author").and_then(|a| a.as_str()) == Some(user_id) {
println!("Your post {}: {:?}", e["action"], record["title"]);
if e["action"].as_str() == Some("update") {
println!("Post updated notification");
}
}
}
},
query,
HashMap::new()
)?;
Ok(())
}
Example 4: Real-time Collaboration
async fn track_document_edits(
pb: &BosBase,
document_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut query = HashMap::new();
query.insert("expand".to_string(), json!("lastEditor"));
pb.collection("documents").subscribe(
document_id,
|e: Value| {
if e["action"].as_str() == Some("update") {
if let Some(record) = e.get("record") {
let last_editor = record.get("lastEditor");
let updated_at = record.get("updated");
// Show who last edited the document
println!("Document edited by: {:?}, at: {:?}", last_editor, updated_at);
}
}
},
query,
HashMap::new()
)?;
Ok(())
}
Error Handling
use bosbase::errors::ClientResponseError;
match pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
) {
Ok(unsubscribe) => {
println!("Subscribed successfully");
// Store unsubscribe function for later use
}
Err(err) => {
match err.status() {
403 => {
eprintln!("Permission denied");
}
404 => {
eprintln!("Collection not found");
}
_ => {
eprintln!("Subscription error: {:?}", err);
}
}
}
}
Best Practices
- Unsubscribe When Done: Always unsubscribe when components unmount or subscriptions are no longer needed
- Handle Disconnections: The SDK handles reconnection automatically, but monitor for issues
- Filter Server-Side: Use query parameters to filter events server-side when possible
- Limit Subscriptions: Don’t subscribe to more collections than necessary
- Use Record-Level When Possible: Prefer record-level subscriptions over collection-level when you only need specific records
- Monitor Connection: Track connection state for debugging and user feedback
- Handle Errors: Wrap subscriptions in error handling
- Respect Permissions: Understand that events respect API rules and permissions
Limitations
- Maximum Subscriptions: Up to 1000 subscriptions per client
- Topic Length: Maximum 2500 characters per topic
- Idle Timeout: Connection closes after 5 minutes of inactivity
- Network Dependency: Requires stable network connection
- SSE Support: SSE requires modern HTTP clients (not available in all environments)
Troubleshooting
Connection Not Establishing
// Manually trigger connection by subscribing
let unsubscribe = pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Connected and receiving events");
},
HashMap::new(),
HashMap::new()
)?;
Events Not Received
- Check API rules - you may not have permission
- Verify subscription is active
- Check network connectivity
- Review server logs for errors
Memory Leaks
Always unsubscribe:
// Good
let unsubscribe = pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
// ... later
unsubscribe();
// Bad - no cleanup
pb.collection("posts").subscribe(
"*",
|e: Value| {
println!("Event: {:?}", e);
},
HashMap::new(),
HashMap::new()
)?;
// Never unsubscribed - potential memory leak!
Related Documentation
- API Records - CRUD operations
- Collections - Collection configuration
- API Rules and Filters - Understanding API rules