Your First SierraDB Application
Let's build a simple e-commerce order management system using SierraDB to demonstrate event sourcing patterns.
Prerequisites
- SierraDB running locally (see Quick Start)
- Python 3.8+ with
redispackage, or Node.js 16+, or Rust 1.75+
Application Overview
We'll build an order management system that handles:
- Creating orders
- Adding/removing items
- Processing payments
- Shipping orders
Each action will generate events that we'll store in SierraDB.
Setting Up
Choose your preferred language:
Python Setup
pip install redis
# app.py
import redis
import json
import uuid
from datetime import datetime
from enum import Enum
class OrderStatus(Enum):
CREATED = "created"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
# Connect to SierraDB
client = redis.Redis(host='localhost', port=9090, protocol=3)
Node.js Setup
npm install redis uuid
// app.js
const redis = require('redis');
const { v4: uuidv4 } = require('uuid');
const client = redis.createClient({
socket: { host: 'localhost', port: 9090 },
RESP: 3
});
await client.connect();
const OrderStatus = {
CREATED: 'created',
PAID: 'paid',
SHIPPED: 'shipped',
DELIVERED: 'delivered',
CANCELLED: 'cancelled'
};
Define Events
Let's define the events our system will generate:
# Python
class OrderEvents:
@staticmethod
def order_created(order_id, customer_id, items):
return {
'order_id': order_id,
'customer_id': customer_id,
'items': items,
'created_at': datetime.utcnow().isoformat(),
'status': OrderStatus.CREATED.value
}
@staticmethod
def item_added(order_id, item_id, quantity, price):
return {
'order_id': order_id,
'item_id': item_id,
'quantity': quantity,
'price': price,
'added_at': datetime.utcnow().isoformat()
}
@staticmethod
def payment_processed(order_id, amount, payment_method):
return {
'order_id': order_id,
'amount': amount,
'payment_method': payment_method,
'processed_at': datetime.utcnow().isoformat()
}
// JavaScript
class OrderEvents {
static orderCreated(orderId, customerId, items) {
return {
order_id: orderId,
customer_id: customerId,
items: items,
created_at: new Date().toISOString(),
status: OrderStatus.CREATED
};
}
static itemAdded(orderId, itemId, quantity, price) {
return {
order_id: orderId,
item_id: itemId,
quantity: quantity,
price: price,
added_at: new Date().toISOString()
};
}
static paymentProcessed(orderId, amount, paymentMethod) {
return {
order_id: orderId,
amount: amount,
payment_method: paymentMethod,
processed_at: new Date().toISOString()
};
}
}
Command Handlers
Now let's create functions that execute business logic and store events:
# Python
def create_order(customer_id, initial_items=None):
order_id = str(uuid.uuid4())
stream_id = f"order-{order_id}"
# Create the order
event_data = OrderEvents.order_created(order_id, customer_id, initial_items or [])
version = client.execute_command(
'EAPPEND', stream_id, 'OrderCreated',
'PAYLOAD', json.dumps(event_data)
)
print(f"Order {order_id} created at version {version}")
return order_id
def add_item_to_order(order_id, item_id, quantity, price):
stream_id = f"order-{order_id}"
# Get current version for optimistic concurrency
current_version = client.execute_command('ESVER', stream_id)
if current_version == 0:
raise ValueError(f"Order {order_id} does not exist")
event_data = OrderEvents.item_added(order_id, item_id, quantity, price)
try:
version = client.execute_command(
'EAPPEND', stream_id, 'ItemAdded',
'EXPECTED_VERSION', current_version,
'PAYLOAD', json.dumps(event_data)
)
print(f"Item added to order {order_id} at version {version}")
return version
except redis.ResponseError as e:
if 'WRONG_EXPECTED_VERSION' in str(e):
print("Concurrent modification detected, please retry")
raise
def process_payment(order_id, amount, payment_method):
stream_id = f"order-{order_id}"
event_data = OrderEvents.payment_processed(order_id, amount, payment_method)
version = client.execute_command(
'EAPPEND', stream_id, 'PaymentProcessed',
'PAYLOAD', json.dumps(event_data)
)
print(f"Payment processed for order {order_id} at version {version}")
return version
Building Projections
Create read models from the events:
# Python
class OrderProjection:
def __init__(self):
self.order_id = None
self.customer_id = None
self.status = None
self.items = {}
self.total_amount = 0
self.created_at = None
self.payment_method = None
def build_order_projection(order_id):
stream_id = f"order-{order_id}"
# Read all events for this order
events = client.execute_command('ESCAN', stream_id, '-', '+')
projection = OrderProjection()
for event in events:
event_name = event[1]
payload = json.loads(event[5])
if event_name == 'OrderCreated':
projection.order_id = payload['order_id']
projection.customer_id = payload['customer_id']
projection.status = payload['status']
projection.created_at = payload['created_at']
# Initialize with any items created with the order
for item in payload.get('items', []):
projection.items[item['item_id']] = {
'quantity': item['quantity'],
'price': item['price']
}
elif event_name == 'ItemAdded':
item_id = payload['item_id']
if item_id in projection.items:
projection.items[item_id]['quantity'] += payload['quantity']
else:
projection.items[item_id] = {
'quantity': payload['quantity'],
'price': payload['price']
}
elif event_name == 'PaymentProcessed':
projection.status = OrderStatus.PAID.value
projection.payment_method = payload['payment_method']
# Calculate total amount
projection.total_amount = sum(
item['quantity'] * item['price']
for item in projection.items.values()
)
return projection
def get_order_details(order_id):
projection = build_order_projection(order_id)
return {
'order_id': projection.order_id,
'customer_id': projection.customer_id,
'status': projection.status,
'items': projection.items,
'total_amount': projection.total_amount,
'created_at': projection.created_at,
'payment_method': projection.payment_method
}
Real-time Updates
Set up subscriptions to keep projections updated:
# Python
import threading
import time
class OrderProjectionService:
def __init__(self):
self.projections = {} # In-memory cache
self.running = True
def start_subscription(self):
"""Start listening for order events"""
def subscription_worker():
try:
# Subscribe to all order streams
subscription_id = client.execute_command(
'ESUB', 'order-*', 'FROM', 'LATEST', 'WINDOW', '1000'
)
print(f"Started subscription {subscription_id}")
# In a real app, you'd use proper subscription handling
# This is a simplified example
while self.running:
time.sleep(1) # Poll for events
except Exception as e:
print(f"Subscription error: {e}")
thread = threading.Thread(target=subscription_worker)
thread.daemon = True
thread.start()
def update_projection(self, order_id):
"""Update projection for a specific order"""
projection = build_order_projection(order_id)
self.projections[order_id] = projection
print(f"Updated projection for order {order_id}")
def get_order(self, order_id):
"""Get order from cache or rebuild"""
if order_id not in self.projections:
self.update_projection(order_id)
return self.projections[order_id]
# Start the projection service
projection_service = OrderProjectionService()
projection_service.start_subscription()
Testing the Application
Let's test our order system:
# Python test script
def test_order_workflow():
print("=== Testing Order Workflow ===")
# 1. Create an order
order_id = create_order("customer-123")
# 2. Add some items
add_item_to_order(order_id, "laptop", 1, 999.99)
add_item_to_order(order_id, "mouse", 1, 29.99)
add_item_to_order(order_id, "keyboard", 1, 79.99)
# 3. Process payment
process_payment(order_id, 1109.97, "credit_card")
# 4. Get current order state
order_details = get_order_details(order_id)
print("\n=== Final Order State ===")
print(json.dumps(order_details, indent=2))
# 5. Show event history
print("\n=== Event History ===")
events = client.execute_command('ESCAN', f'order-{order_id}', '-', '+')
for i, event in enumerate(events, 1):
event_name = event[1]
timestamp = event[4]
payload = json.loads(event[5])
print(f"{i}. {event_name} at {timestamp}")
print(f" Data: {payload}")
if __name__ == "__main__":
test_order_workflow()
Expected Output
=== Testing Order Workflow ===
Order 123e4567-e89b-12d3-a456-426614174000 created at version 1
Item added to order 123e4567-e89b-12d3-a456-426614174000 at version 2
Item added to order 123e4567-e89b-12d3-a456-426614174000 at version 3
Item added to order 123e4567-e89b-12d3-a456-426614174000 at version 4
Payment processed for order 123e4567-e89b-12d3-a456-426614174000 at version 5
=== Final Order State ===
{
"order_id": "123e4567-e89b-12d3-a456-426614174000",
"customer_id": "customer-123",
"status": "paid",
"items": {
"laptop": {"quantity": 1, "price": 999.99},
"mouse": {"quantity": 1, "price": 29.99},
"keyboard": {"quantity": 1, "price": 79.99}
},
"total_amount": 1109.97,
"created_at": "2024-01-15T10:30:00.000Z",
"payment_method": "credit_card"
}
=== Event History ===
1. OrderCreated at 1705316200000
Data: {"order_id": "123e4567...", "customer_id": "customer-123", ...}
2. ItemAdded at 1705316201000
Data: {"order_id": "123e4567...", "item_id": "laptop", ...}
...
Key Takeaways
This example demonstrates several important event sourcing patterns:
1. Event-First Design
- Every business operation generates events
- Events are the source of truth
- Current state is derived from events
2. Optimistic Concurrency
- Use
EXPECTED_VERSIONto handle concurrent updates - Retry logic for version conflicts
- Eventual consistency for read models
3. Projection Patterns
- Build read models from event streams
- Cache projections for performance
- Real-time updates via subscriptions
4. Stream Design
- One stream per aggregate (order)
- Meaningful stream IDs
- Event versioning within streams
Next Steps
- Architecture Overview - Understand SierraDB internals