Realtime API - Kotlin SDK Documentation
Overview
The Realtime API enables real-time updates for collection records using Server-Sent Events (SSE). It allows you to subscribe to changes in collections or specific records and receive instant notifications when records are created, updated, or deleted.
Key Features:
- Real-time notifications for record changes
- Collection-level and record-level subscriptions
- Automatic connection management and reconnection
- Authorization support
- Subscription options (expand, custom headers, query params)
- Event-driven architecture
Backend Endpoints:
GET /api/realtime- Establish SSE connectionPOST /api/realtime- Set subscriptions
📖 Reference: For detailed realtime concepts, see the JavaScript SDK Realtime documentation.
How It Works
- Connection: The SDK establishes an SSE connection to
/api/realtime - Client ID: Server sends
PB_CONNECTevent with a uniqueclientId - Subscriptions: Client submits subscription topics via POST request
- Events: Server sends events when matching records change
- Reconnection: SDK automatically reconnects on connection loss
Basic Usage
Subscribe to Collection Changes
Subscribe to all changes in a collection:
import com.bosbase.sdk.BosBase
val pb = BosBase("http://127.0.0.1:8090")
// Subscribe to all changes in the 'posts' collection
val unsubscribe = pb.collection("posts").subscribe("*") { event ->
val action = event["action"]?.toString() // 'create', 'update', or 'delete'
val record = event["record"] // The record data
println("Action: $action")
println("Record: $record")
}
// Later, unsubscribe
unsubscribe()
Subscribe to Specific Record
Subscribe to changes for a single record:
// Subscribe to changes for a specific post
val unsubscribe = pb.collection("posts").subscribe("RECORD_ID") { event ->
val record = event["record"]
val action = event["action"]?.toString()
println("Record changed: $record")
println("Action: $action")
}
// Unsubscribe when done
unsubscribe()
Multiple Subscriptions
You can subscribe multiple times to the same or different topics:
// Subscribe to multiple records
val unsubscribe1 = pb.collection("posts").subscribe("RECORD_ID_1") { event ->
handleChange(event)
}
val unsubscribe2 = pb.collection("posts").subscribe("RECORD_ID_2") { event ->
handleChange(event)
}
val unsubscribe3 = pb.collection("posts").subscribe("*") { event ->
handleAllChanges(event)
}
fun handleChange(event: Map<String, Any?>) {
println("Change event: $event")
}
fun handleAllChanges(event: Map<String, Any?>) {
println("Collection-wide change: $event")
}
// Unsubscribe individually
unsubscribe1()
unsubscribe2()
unsubscribe3()
Subscription Options
With Expand
Subscribe with relation expansion:
val unsubscribe = pb.collection("posts").subscribe(
topic = "*",
callback = { event ->
val record = event["record"]
println("Record: $record")
},
query = mapOf("expand" to "author,categories")
)
With Custom Headers
val unsubscribe = pb.collection("posts").subscribe(
topic = "*",
callback = { event ->
println("Event: $event")
},
headers = mapOf(
"X-Custom-Header" to "value"
)
)
With Query Parameters
val unsubscribe = pb.collection("posts").subscribe(
topic = "*",
callback = { event ->
println("Event: $event")
},
query = mapOf(
"expand" to "author",
"fields" to "id,title,content"
)
)
Unsubscribing
Unsubscribe from Specific Topic
// Unsubscribe from a specific record
pb.collection("posts").unsubscribe("RECORD_ID")
// Unsubscribe from all collection subscriptions
pb.collection("posts").unsubscribe()
Unsubscribe by Prefix
// Unsubscribe from all topics starting with a prefix
pb.realtime.unsubscribeByPrefix("posts/")
Unsubscribe All
// Unsubscribe from all realtime subscriptions
pb.realtime.unsubscribe()
Connection Management
Check Connection Status
val isConnected = pb.realtime.clientId.isNotBlank()
println("Realtime connected: $isConnected")
Manual Disconnect
// Manually disconnect (will not auto-reconnect)
pb.realtime.disconnect()
Disconnect Callback
Handle disconnection events:
pb.realtime.onDisconnect = { activeSubscriptions ->
println("Disconnected. Active subscriptions: $activeSubscriptions")
// Handle reconnection logic if needed
}
Event Structure
Events received in callbacks have the following structure:
{
"action": "create" | "update" | "delete",
"record": { /* record data */ }
}
Accessing Event Data
val unsubscribe = pb.collection("posts").subscribe("*") { event ->
val action = event["action"]?.toString()
val record = event["record"] as? Map<String, Any?>
when (action) {
"create" -> {
val title = record?.get("title")?.toString()
println("New post created: $title")
}
"update" -> {
val id = record?.get("id")?.toString()
println("Post updated: $id")
}
"delete" -> {
val id = record?.get("id")?.toString()
println("Post deleted: $id")
}
}
}
Error Handling
Realtime subscriptions handle errors gracefully with automatic reconnection:
val unsubscribe = pb.collection("posts").subscribe("*") { event ->
try {
// Process event
val record = event["record"]
println("Received: $record")
} catch (e: Exception) {
println("Error processing event: $e")
}
}
// The SDK will automatically reconnect on connection loss
Examples
Complete Realtime Example
import com.bosbase.sdk.BosBase
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.contentOrNull
fun main() {
val pb = BosBase("http://localhost:8090")
// Authenticate if needed
pb.collection("users").authWithPassword("user@example.com", "password")
// Subscribe to all posts
val unsubscribe = pb.collection("posts").subscribe("*") { event ->
val action = event["action"]?.toString()
val record = event["record"] as? Map<String, Any?>
when (action) {
"create" -> {
val title = record?.get("title")?.toString()
println("✨ New post: $title")
}
"update" -> {
val title = record?.get("title")?.toString()
println("📝 Updated post: $title")
}
"delete" -> {
val id = record?.get("id")?.toString()
println("🗑️ Deleted post: $id")
}
}
}
println("Listening for realtime updates...")
// Keep the program running
Thread.sleep(60000) // Listen for 60 seconds
// Clean up
unsubscribe()
pb.realtime.disconnect()
}
Real-time Chat Example
class ChatManager(private val pb: BosBase, private val roomId: String) {
private var unsubscribe: (() -> Unit)? = null
fun startListening(onMessage: (Map<String, Any?>) -> Unit) {
unsubscribe = pb.collection("messages").subscribe("*") { event ->
val action = event["action"]?.toString()
val record = event["record"] as? Map<String, Any?>
if (action == "create") {
val messageRoomId = record?.get("roomId")?.toString()
if (messageRoomId == roomId) {
onMessage(record)
}
}
}
}
fun stopListening() {
unsubscribe?.invoke()
unsubscribe = null
}
}
// Usage
val chatManager = ChatManager(pb, "room_123")
chatManager.startListening { message ->
val text = message["text"]?.toString()
val author = message["author"]?.toString()
println("$author: $text")
}
Live Dashboard Example
class LiveDashboard(private val pb: BosBase) {
private val subscriptions = mutableListOf<() -> Unit>()
fun start() {
// Subscribe to posts
subscriptions.add(
pb.collection("posts").subscribe("*") { event ->
updatePostsCounter(event)
}
)
// Subscribe to users
subscriptions.add(
pb.collection("users").subscribe("*") { event ->
updateUsersCounter(event)
}
)
// Subscribe to comments
subscriptions.add(
pb.collection("comments").subscribe("*") { event ->
updateCommentsCounter(event)
}
)
}
fun stop() {
subscriptions.forEach { it.invoke() }
subscriptions.clear()
}
private fun updatePostsCounter(event: Map<String, Any?>) {
val action = event["action"]?.toString()
println("Posts: $action event received")
}
private fun updateUsersCounter(event: Map<String, Any?>) {
val action = event["action"]?.toString()
println("Users: $action event received")
}
private fun updateCommentsCounter(event: Map<String, Any?>) {
val action = event["action"]?.toString()
println("Comments: $action event received")
}
}
// Usage
val dashboard = LiveDashboard(pb)
dashboard.start()
// Later...
dashboard.stop()
Android Realtime Example
import android.os.Handler
import android.os.Looper
import com.bosbase.sdk.BosBase
class RealtimeViewModel(private val pb: BosBase) {
private val handler = Handler(Looper.getMainLooper())
private var unsubscribe: (() -> Unit)? = null
fun observePosts(onUpdate: (List<Map<String, Any?>>) -> Unit) {
unsubscribe = pb.collection("posts").subscribe("*") { event ->
// Update on main thread
handler.post {
// Fetch updated list
val posts = pb.collection("posts").getList(page = 1, perPage = 50)
onUpdate(posts.items.map { it.jsonObject })
}
}
}
fun stopObserving() {
unsubscribe?.invoke()
unsubscribe = null
}
}