EPSEQ
Get the current sequence number for a partition.
Syntax
EPSEQ <partition>
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
partition | Partition Selector | Yes | Partition selector (partition ID 0-65535 or UUID key) |
Partition Selector
You can specify partitions in two ways:
- Partition ID: Integer from 0 to partition_count-1 (e.g.,
42) - Partition Key: UUID that maps to a partition (e.g.,
550e8400-e29b-41d4-a716-446655440000)
Return Value
Integer or Null: Current sequence number of the partition, or null if no events exist in the partition.
- If partition has events: Returns the current sequence number (starting from 1)
- If partition is empty: Returns
null
Examples
Get Partition Sequence by ID
EPSEQ 42
Returns:
1234
Get Partition Sequence by UUID
EPSEQ 550e8400-e29b-41d4-a716-446655440000
Returns:
567
Empty Partition
EPSEQ 99
Returns:
null
Client Examples
Python
import redis
client = redis.Redis(host='localhost', port=9090, protocol=3)
def get_partition_sequence(partition_id):
"""Get current sequence for a partition"""
sequence = client.execute_command('EPSEQ', partition_id)
return sequence
def wait_for_partition_progress(partition_id, target_sequence, timeout=30):
"""Wait for partition to reach a target sequence"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
current = get_partition_sequence(partition_id)
if current is not None and current >= target_sequence:
return True
time.sleep(0.1)
return False
# Check current progress
current_seq = get_partition_sequence(5)
print(f"Partition 5 current sequence: {current_seq}")
# Wait for specific progress
if wait_for_partition_progress(5, 1000):
print("Partition reached target sequence")
else:
print("Timeout waiting for partition progress")
JavaScript
const redis = require('redis');
const client = redis.createClient({
socket: { host: 'localhost', port: 9090 },
RESP: 3
});
await client.connect();
async function getPartitionStatus(partitionIds) {
const status = {};
for (const partitionId of partitionIds) {
const sequence = await client.sendCommand(['EPSEQ', partitionId.toString()]);
status[partitionId] = sequence;
}
return status;
}
// Check status of multiple partitions
const partitions = [0, 1, 2, 3, 4, 5, 6, 7];
const status = await getPartitionStatus(partitions);
console.log('Partition Status:');
for (const [partition, sequence] of Object.entries(status)) {
console.log(` Partition ${partition}: ${sequence || 'empty'}`);
}
await client.disconnect();
Rust
use redis::{Commands, RedisResult};
use uuid::Uuid;
fn get_partition_sequence(partition_id: u16) -> RedisResult<Option<u64>> {
let client = redis::Client::open("redis://127.0.0.1:9090/")?;
let mut con = client.get_connection()?;
let sequence: Option<u64> = redis::cmd("EPSEQ")
.arg(partition_id)
.query(&mut con)?;
Ok(sequence)
}
fn get_partition_sequence_by_key(partition_key: &Uuid) -> RedisResult<Option<u64>> {
let client = redis::Client::open("redis://127.0.0.1:9090/")?;
let mut con = client.get_connection()?;
let sequence: Option<u64> = redis::cmd("EPSEQ")
.arg(partition_key.to_string())
.query(&mut con)?;
Ok(sequence)
}
fn main() -> RedisResult<()> {
// Check by partition ID
match get_partition_sequence(5)? {
Some(seq) => println!("Partition 5 sequence: {}", seq),
None => println!("Partition 5 is empty"),
}
// Check by partition key
let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
match get_partition_sequence_by_key(&partition_key)? {
Some(seq) => println!("Partition key {} sequence: {}", partition_key, seq),
None => println!("Partition is empty"),
}
Ok(())
}
Use Cases
Monitoring Progress
Monitor partition progress across a cluster:
def monitor_cluster_progress(partition_count):
"""Monitor progress across all partitions"""
while True:
total_events = 0
slowest_partition = None
slowest_sequence = float('inf')
print("Partition Progress:")
for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None:
total_events += sequence
if sequence < slowest_sequence:
slowest_sequence = sequence
slowest_partition = partition_id
print(f" Partition {partition_id}: {sequence}")
else:
print(f" Partition {partition_id}: empty")
print(f"Total events: {total_events}")
if slowest_partition is not None:
print(f"Slowest partition: {slowest_partition} (seq {slowest_sequence})")
time.sleep(5)
Checkpoint Management
Track processing checkpoints:
class PartitionCheckpoint:
def __init__(self, partition_id):
self.partition_id = partition_id
self.last_processed_sequence = 0
def get_unprocessed_count(self):
"""Get number of unprocessed events"""
current_sequence = client.execute_command('EPSEQ', self.partition_id)
if current_sequence is None:
return 0
return max(0, current_sequence - self.last_processed_sequence)
def mark_processed(self, sequence):
"""Mark events up to sequence as processed"""
self.last_processed_sequence = sequence
def needs_processing(self):
"""Check if partition has unprocessed events"""
return self.get_unprocessed_count() > 0
# Use checkpoint tracking
checkpoint = PartitionCheckpoint(5)
if checkpoint.needs_processing():
unprocessed = checkpoint.get_unprocessed_count()
print(f"Partition 5 has {unprocessed} unprocessed events")
Load Balancing
Balance work across partitions:
def find_most_active_partition(partition_count):
"""Find partition with most recent activity"""
most_active = None
highest_sequence = -1
for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None and sequence > highest_sequence:
highest_sequence = sequence
most_active = partition_id
return most_active, highest_sequence
def balance_processing_load(partition_count, worker_count):
"""Distribute partitions among workers based on activity"""
partition_sequences = {}
for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
partition_sequences[partition_id] = sequence or 0
# Sort partitions by sequence (activity)
sorted_partitions = sorted(partition_sequences.items(), key=lambda x: x[1], reverse=True)
# Distribute among workers
workers = [[] for _ in range(worker_count)]
worker_loads = [0] * worker_count
for partition_id, sequence in sorted_partitions:
# Assign to worker with lowest current load
worker_idx = worker_loads.index(min(worker_loads))
workers[worker_idx].append(partition_id)
worker_loads[worker_idx] += sequence
return workers
# Balance 8 partitions among 3 workers
workers = balance_processing_load(8, 3)
for i, partitions in enumerate(workers):
print(f"Worker {i}: partitions {partitions}")
Catchup Detection
Detect when a partition is behind:
def detect_lagging_partitions(partition_count, threshold=1000):
"""Detect partitions that are significantly behind"""
sequences = []
for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None:
sequences.append((partition_id, sequence))
if not sequences:
return []
# Calculate median sequence
sorted_sequences = sorted([seq for _, seq in sequences])
median = sorted_sequences[len(sorted_sequences) // 2]
# Find lagging partitions
lagging = []
for partition_id, sequence in sequences:
if median - sequence > threshold:
lagging.append({
'partition_id': partition_id,
'current_sequence': sequence,
'behind_by': median - sequence
})
return lagging
# Check for lagging partitions
lagging = detect_lagging_partitions(32, threshold=500)
if lagging:
print("Lagging partitions detected:")
for info in lagging:
print(f" Partition {info['partition_id']}: {info['behind_by']} events behind")
Performance Notes
- O(1) operation: Very fast, reads from in-memory metadata
- No disk I/O: Sequence numbers are cached in memory
- Cluster-aware: Works across distributed nodes
- Real-time: Reflects current state immediately
Error Responses
Invalid Partition ID
EPSEQ 99999
-INVALID_PARTITION Partition ID 99999 is out of range (0-31)
Invalid UUID Format
EPSEQ invalid-uuid
-SYNTAX_ERROR Invalid UUID format for partition key
Monitoring Example
Complete monitoring script:
#!/usr/bin/env python3
import redis
import time
import json
from datetime import datetime
class PartitionMonitor:
def __init__(self, host='localhost', port=9090, partition_count=32):
self.client = redis.Redis(host=host, port=port, protocol=3)
self.partition_count = partition_count
self.previous_sequences = {}
def get_all_sequences(self):
"""Get current sequence for all partitions"""
sequences = {}
for partition_id in range(self.partition_count):
seq = self.client.execute_command('EPSEQ', partition_id)
sequences[partition_id] = seq
return sequences
def calculate_rates(self, current_sequences):
"""Calculate event rates per partition"""
rates = {}
for partition_id, current_seq in current_sequences.items():
previous_seq = self.previous_sequences.get(partition_id, 0)
if current_seq is not None and previous_seq is not None:
rates[partition_id] = max(0, current_seq - previous_seq)
else:
rates[partition_id] = 0
return rates
def print_status(self, sequences, rates):
"""Print formatted status"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n=== Partition Status at {timestamp} ===")
total_events = sum(seq for seq in sequences.values() if seq is not None)
total_rate = sum(rates.values())
print(f"Total Events: {total_events:,}")
print(f"Total Rate: {total_rate} events/sec")
print()
print("Partition | Sequence | Rate/sec | Status")
print("-" * 40)
for partition_id in range(self.partition_count):
seq = sequences.get(partition_id)
rate = rates.get(partition_id, 0)
if seq is None:
status = "Empty"
seq_str = "-"
elif rate == 0:
status = "Idle"
seq_str = f"{seq:,}"
else:
status = "Active"
seq_str = f"{seq:,}"
print(f"{partition_id:9} | {seq_str:8} | {rate:8} | {status}")
def monitor(self, interval=1):
"""Start monitoring loop"""
print(f"Starting partition monitor (interval: {interval}s)")
print("Press Ctrl+C to stop")
try:
while True:
current_sequences = self.get_all_sequences()
rates = self.calculate_rates(current_sequences)
self.print_status(current_sequences, rates)
self.previous_sequences = current_sequences
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped")
if __name__ == "__main__":
monitor = PartitionMonitor()
monitor.monitor(interval=5)