Skip to main content

Event Sourcing

Event sourcing is a pattern where application state is determined by a sequence of events. Instead of storing current state, you store the events that led to that state.

Why Event Sourcing?

Traditional CRUD applications store current state in mutable records. Event sourcing takes a different approach:

  • Audit Trail: Complete history of what happened and when
  • Temporal Queries: Query state at any point in time
  • Debugging: Replay events to reproduce issues
  • Flexibility: Derive new projections from existing events

Event Sourcing with SierraDB

SierraDB is purpose-built for event sourcing with:

  • Append-only storage optimized for event streams
  • Strong ordering guarantees within streams and partitions
  • Real-time subscriptions for building projections
  • Horizontal scalability to handle high event volumes

Core Concepts

Events

Events are immutable records of things that happened. When retrieved from SierraDB, events have the following structure:

{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"partition_key": "550e8400-e29b-41d4-a716-446655440001",
"partition_id": 5,
"transaction_id": "550e8400-e29b-41d4-a716-446655440002",
"partition_sequence": 123,
"stream_version": 0,
"timestamp": 1697641800000,
"stream_id": "user-123",
"event_name": "UserRegistered",
"metadata": "{\"source\":\"registration-api\",\"correlation_id\":\"abc-123\"}",
"payload": "{\"email\":\"alice@example.com\",\"name\":\"Alice Johnson\"}"
}

Event Fields:

  • event_id: Unique identifier for this event
  • partition_key: Key used to determine partition placement
  • partition_id: Partition where this event is stored
  • transaction_id: Identifier linking events in the same transaction
  • partition_sequence: Monotonic sequence number within the partition (0-based)
  • stream_version: Version of this event within its stream (0-based)
  • timestamp: Unix timestamp in milliseconds
  • stream_id: Identifier for the event stream
  • event_name: Type/name of the event
  • metadata: JSON string containing event metadata
  • payload: JSON string containing event data

Streams

A stream is a sequence of related events identified by a stream_id:

# All events for user-123
user-123: [UserRegistered, EmailVerified, ProfileUpdated, ...]

# All events for order-456
order-456: [OrderCreated, ItemAdded, ItemRemoved, OrderShipped, ...]

Stream Versions

Each event in a stream has a monotonic version number starting from 0:

user-123 version 0: UserRegistered
user-123 version 1: EmailVerified
user-123 version 2: ProfileUpdated

Partition Sequences

Events are also assigned monotonic sequence numbers within their partition, starting from 0:

# Partition 5 sequence 0: UserRegistered (user-123, version 0)
# Partition 5 sequence 1: OrderCreated (order-456, version 0)
# Partition 5 sequence 2: EmailVerified (user-123, version 1)
# Partition 5 sequence 3: ItemAdded (order-456, version 1)

Partition sequences enable:

  • Efficient partition scanning with EPSCAN
  • Cross-stream ordering within partitions
  • Replication ordering across nodes

Basic Operations

Appending Events

Store events in streams with EAPPEND:

# Append to a stream
EAPPEND user-123 UserRegistered PAYLOAD '{"email":"alice@example.com"}'

# With expected version for optimistic concurrency
EAPPEND user-123 EmailVerified EXPECTED_VERSION 0 PAYLOAD '{"verified_at":"2024-01-01"}'

Reading Events

Read events from streams with ESCAN:

# Read all events from beginning
ESCAN user-123 - +

# Read events from version 1 onwards
ESCAN user-123 1 +

# Read specific version range
ESCAN user-123 0 4

Stream Information

Get stream metadata:

# Get current stream version
ESVER user-123

# Returns: 2 (if stream has 3 events)

Advanced Patterns

Aggregate Roots

Group related events under an aggregate:

# Order aggregate - all events affect the same order
EAPPEND order-456 OrderCreated PAYLOAD '{"customer_id":"user-123"}'
EAPPEND order-456 ItemAdded PAYLOAD '{"product_id":"laptop","qty":1}'
EAPPEND order-456 ItemRemoved PAYLOAD '{"product_id":"mouse","qty":1}'
EAPPEND order-456 OrderShipped PAYLOAD '{"tracking_number":"ABC123"}'

Cross-Stream Transactions

Use EMAPPEND for transactions across multiple streams in the same partition:

# Transfer money between accounts atomically
EMAPPEND 550e8400-e29b-41d4-a716-446655440000 \
account-123 MoneyWithdrawn PAYLOAD '{"amount":100}' \
account-456 MoneyDeposited PAYLOAD '{"amount":100}'

Event Metadata

Include context and correlation information:

EAPPEND user-123 ProfileUpdated \
PAYLOAD '{"name":"Alice Smith"}' \
METADATA '{"user_id":"user-123","source":"web-app","correlation_id":"req-789"}'

Building Projections

Projections derive read models from events:

import redis

client = redis.Redis(host='localhost', port=9090, protocol=3)

# Read all user events and build a projection
def build_user_projection(user_id):
events = client.execute_command('ESCAN', f'user-{user_id}', '-', '+')

user = {}
for event in events['events']: # events is a RESP3 map with 'events' array
event_name = event['event_name']
payload = json.loads(event['payload']) # payload is a JSON string

if event_name == 'UserRegistered':
user['email'] = payload['email']
user['name'] = payload['name']
user['created_at'] = event['timestamp']

elif event_name == 'EmailVerified':
user['email_verified'] = True
user['verified_at'] = payload['verified_at']

elif event_name == 'ProfileUpdated':
user.update(payload)

return user

# Build projection for user-123
user_projection = build_user_projection(123)
print(user_projection)

Real-time Projections

Use subscriptions to keep projections up-to-date:

import json
import redis

client = redis.Redis(host='localhost', port=9090, protocol=3)

# Subscribe to all user events
subscription = client.execute_command('ESUB', 'user-*', 'FROM', 'LATEST')

# Process events as they arrive
def update_projection(event):
stream_id = event['stream_id']
event_name = event['event_name']
payload = json.loads(event['payload'])

# Update your read model/database
if event_name == 'UserRegistered':
save_user_to_db(stream_id, payload)
elif event_name == 'ProfileUpdated':
update_user_in_db(stream_id, payload)

# Process subscription events
for event in subscription:
update_projection(event)

Best Practices

Event Design

Do:

  • Use past-tense event names (UserRegistered, not RegisterUser)
  • Include all necessary data in the event
  • Make events immutable and self-contained
  • Use meaningful stream IDs

Don't:

  • Store references that might become invalid
  • Include computed values that can be derived
  • Make events too large (split into multiple if needed)

Stream Design

Aggregate per Stream:

# Good: One aggregate per stream
user-123: [UserRegistered, EmailVerified, ProfileUpdated]
order-456: [OrderCreated, ItemAdded, OrderShipped]

# Avoid: Multiple aggregates in one stream
mixed-789: [UserRegistered, OrderCreated, PaymentProcessed]

Consistent Partitioning:

# Use consistent partition keys for related streams
EAPPEND user-123 UserRegistered PARTITION_KEY user-partition-key
EAPPEND user-account-123 AccountCreated PARTITION_KEY user-partition-key

Error Handling

Handle concurrency with expected versions:

def update_user_profile(user_id, current_version, updates):
try:
client.execute_command('EAPPEND', f'user-{user_id}',
'ProfileUpdated',
'EXPECTED_VERSION', current_version,
'PAYLOAD', json.dumps(updates))
return True
except redis.ResponseError as e:
if 'wrong expected version' in str(e):
# Handle concurrency conflict
return False
raise

Common Patterns

Saga Pattern

Coordinate long-running processes:

# Start a saga
EAPPEND saga-order-123 OrderSagaStarted PAYLOAD '{"order_id":"order-123"}'

# Process steps
EAPPEND saga-order-123 PaymentRequested PAYLOAD '{"amount":100}'
EAPPEND saga-order-123 PaymentCompleted PAYLOAD '{"transaction_id":"tx-456"}'
EAPPEND saga-order-123 InventoryReserved PAYLOAD '{"items":["laptop"]}'
EAPPEND saga-order-123 OrderSagaCompleted PAYLOAD '{}'

CQRS (Command Query Responsibility Segregation)

Separate read and write models:

# Command side: Write to event streams
def handle_register_user_command(command):
event = UserRegistered(email=command.email, name=command.name)
append_event('user-' + command.user_id, event)

# Query side: Read from projections
def get_user_profile(user_id):
return user_projection_db.get(user_id)

# Keep projections updated via subscriptions
def update_user_projections():
for event in subscribe_to_user_events():
update_projection_from_event(event)

Next Steps