Pub/Sub API - C# SDK Documentation
BosBase now exposes a lightweight WebSocket-based publish/subscribe channel so SDK users can push and receive custom messages. The Go backend uses the ws transport and persists each published payload in the _pubsub_messages table so every node in a cluster can replay and fan-out messages to its local subscribers.
- Endpoint:
/api/pubsub(WebSocket) - Auth: the SDK automatically forwards
authStore.tokenas atokenquery parameter; cookie-based auth also works. Anonymous clients may subscribe, but publishing requires an authenticated token. - Reliability: automatic reconnect with topic re-subscription; messages are stored in the database and broadcasted to all connected nodes.
Quick Start
using Bosbase;
var client = new BosbaseClient("http://127.0.0.1:8090");
// Subscribe to a topic
var unsubscribe = await client.PubSub.SubscribeAsync("chat/general", (msg) =>
{
Console.WriteLine($"message {msg.Topic}: {msg.Data}");
});
// Publish to a topic (resolves when the server stores and accepts it)
var ack = await client.PubSub.PublishAsync("chat/general", new { text = "Hello team!" });
Console.WriteLine($"published at {ack.Created}");
// Later, stop listening
await unsubscribe();
API Surface
client.PubSub.PublishAsync(topic, data)→Task<PublishAck>client.PubSub.SubscribeAsync(topic, handler)→Task<Func<Task>>client.PubSub.UnsubscribeAsync(topic?)→Task(omittopicto drop all topics)client.PubSub.Disconnect()to explicitly close the socket and clear pending requests.client.PubSub.IsConnectedexposes the current WebSocket state.
Notes for Clusters
- Messages are written to
_pubsub_messageswith a timestamp; every running node polls the table and pushes new rows to its connected WebSocket clients. - Old pub/sub rows are cleaned up automatically after a day to keep the table small.
- If a node restarts, it resumes from the latest message and replays new rows as they are inserted, so connected clients on other nodes stay in sync.
Complete Examples
Example 1: Chat Application
using Bosbase;
var client = new BosbaseClient("http://127.0.0.1:8090");
// Authenticate (required for publishing)
await client.Collection("users").AuthWithPasswordAsync("user@example.com", "password");
// Subscribe to chat room
var unsubscribe = await client.PubSub.SubscribeAsync("chat/room1", (msg) =>
{
var data = msg.Data as Dictionary<string, object?>;
Console.WriteLine($"[{msg.Created}] {data?["user"]}: {data?["message"]}");
});
// Publish a message
await client.PubSub.PublishAsync("chat/room1", new Dictionary<string, object?>
{
["user"] = "user@example.com",
["message"] = "Hello everyone!"
});
// Keep the connection alive
await Task.Delay(TimeSpan.FromMinutes(5));
// Unsubscribe when done
await unsubscribe();
Example 2: Real-time Notifications
class NotificationService
{
private readonly BosbaseClient _client;
private Func<Task>? _unsubscribe;
public NotificationService(BosbaseClient client)
{
_client = client;
}
public async Task StartListening(string userId)
{
_unsubscribe = await _client.PubSub.SubscribeAsync($"notifications/{userId}", (msg) =>
{
var notification = msg.Data as Dictionary<string, object?>;
HandleNotification(notification);
});
}
private void HandleNotification(Dictionary<string, object?>? notification)
{
var type = notification?["type"]?.ToString();
var message = notification?["message"]?.ToString();
Console.WriteLine($"Notification [{type}]: {message}");
// Show system notification, update UI, etc.
}
public async Task SendNotification(string userId, string type, string message)
{
await _client.PubSub.PublishAsync($"notifications/{userId}", new Dictionary<string, object?>
{
["type"] = type,
["message"] = message,
["timestamp"] = DateTime.UtcNow.ToString("O")
});
}
public async Task StopListening()
{
if (_unsubscribe != null)
{
await _unsubscribe();
_unsubscribe = null;
}
}
}
// Usage
var notificationService = new NotificationService(client);
await notificationService.StartListening("user123");
await notificationService.SendNotification("user123", "info", "Your order has been shipped");
Example 3: Multi-topic Subscription
var client = new BosbaseClient("http://127.0.0.1:8090");
// Subscribe to multiple topics
var unsubscribe1 = await client.PubSub.SubscribeAsync("events/system", (msg) =>
{
Console.WriteLine($"System event: {msg.Data}");
});
var unsubscribe2 = await client.PubSub.SubscribeAsync("events/user", (msg) =>
{
Console.WriteLine($"User event: {msg.Data}");
});
// Publish to different topics
await client.PubSub.PublishAsync("events/system", new { type = "maintenance", message = "Scheduled maintenance in 1 hour" });
await client.PubSub.PublishAsync("events/user", new { userId = "123", action = "login" });
// Unsubscribe from specific topics
await unsubscribe1();
await unsubscribe2();
Example 4: Connection Status Monitoring
class PubSubManager
{
private readonly BosbaseClient _client;
private readonly System.Timers.Timer _statusTimer;
public PubSubManager(BosbaseClient client)
{
_client = client;
_statusTimer = new System.Timers.Timer(5000); // Check every 5 seconds
_statusTimer.Elapsed += (sender, e) => CheckConnectionStatus();
}
public void StartMonitoring()
{
_statusTimer.Start();
}
public void StopMonitoring()
{
_statusTimer.Stop();
}
private void CheckConnectionStatus()
{
var isConnected = _client.PubSub.IsConnected;
Console.WriteLine($"PubSub connection status: {(isConnected ? "Connected" : "Disconnected")}");
if (!isConnected)
{
Console.Warn("PubSub connection lost. Reconnection will be attempted automatically.");
}
}
}
// Usage
var manager = new PubSubManager(client);
manager.StartMonitoring();
var unsubscribe = await client.PubSub.SubscribeAsync("test/topic", (msg) =>
{
Console.WriteLine($"Received: {msg.Data}");
});
// Connection status is monitored automatically
Example 5: Error Handling
try
{
var unsubscribe = await client.PubSub.SubscribeAsync("test/topic", (msg) =>
{
try
{
// Process message
ProcessMessage(msg);
}
catch (Exception ex)
{
Console.Error.WriteLine($"Error processing message: {ex.Message}");
}
});
await client.PubSub.PublishAsync("test/topic", new { data = "test" });
}
catch (Exception ex)
{
Console.Error.WriteLine($"PubSub error: {ex.Message}");
}
Best Practices
- Error Handling: Always wrap message handlers in try-catch blocks
- Connection Management: Use
IsConnectedto check connection status - Cleanup: Always unsubscribe when done to free resources
- Reconnection: The SDK handles automatic reconnection, but monitor connection status
- Message Format: Use consistent message formats for easier processing
- Authentication: Ensure authentication before publishing messages
- Topic Naming: Use hierarchical topic names (e.g.,
chat/room1,notifications/user123)
Limitations
- WebSocket Only: Requires WebSocket support
- Authentication Required: Publishing requires authentication
- Message Size: Large messages may impact performance
- Connection Limits: Be mindful of connection limits on the server
Related Documentation
- Realtime API - Real-time record subscriptions
- Authentication - User authentication