Skip to main content

EPSEQ

Get the current sequence number for a partition.

Syntax

EPSEQ <partition>

Parameters

ParameterTypeRequiredDescription
partitionPartition SelectorYesPartition selector (partition ID 0-65535 or UUID key)

Partition Selector

You can specify partitions in two ways:

  • Partition ID: Integer from 0 to partition_count-1 (e.g., 42)
  • Partition Key: UUID that maps to a partition (e.g., 550e8400-e29b-41d4-a716-446655440000)

Return Value

Integer or Null: Current sequence number of the partition, or null if no events exist in the partition.

  • If partition has events: Returns the current sequence number (starting from 1)
  • If partition is empty: Returns null

Examples

Get Partition Sequence by ID

EPSEQ 42

Returns:

1234

Get Partition Sequence by UUID

EPSEQ 550e8400-e29b-41d4-a716-446655440000

Returns:

567

Empty Partition

EPSEQ 99

Returns:

null

Client Examples

Python

import redis

client = redis.Redis(host='localhost', port=9090, protocol=3)

def get_partition_sequence(partition_id):
"""Get current sequence for a partition"""
sequence = client.execute_command('EPSEQ', partition_id)
return sequence

def wait_for_partition_progress(partition_id, target_sequence, timeout=30):
"""Wait for partition to reach a target sequence"""
import time

start_time = time.time()
while time.time() - start_time < timeout:
current = get_partition_sequence(partition_id)
if current is not None and current >= target_sequence:
return True
time.sleep(0.1)
return False

# Check current progress
current_seq = get_partition_sequence(5)
print(f"Partition 5 current sequence: {current_seq}")

# Wait for specific progress
if wait_for_partition_progress(5, 1000):
print("Partition reached target sequence")
else:
print("Timeout waiting for partition progress")

JavaScript

const redis = require('redis');

const client = redis.createClient({
socket: { host: 'localhost', port: 9090 },
RESP: 3
});

await client.connect();

async function getPartitionStatus(partitionIds) {
const status = {};

for (const partitionId of partitionIds) {
const sequence = await client.sendCommand(['EPSEQ', partitionId.toString()]);
status[partitionId] = sequence;
}

return status;
}

// Check status of multiple partitions
const partitions = [0, 1, 2, 3, 4, 5, 6, 7];
const status = await getPartitionStatus(partitions);

console.log('Partition Status:');
for (const [partition, sequence] of Object.entries(status)) {
console.log(` Partition ${partition}: ${sequence || 'empty'}`);
}

await client.disconnect();

Rust

use redis::{Commands, RedisResult};
use uuid::Uuid;

fn get_partition_sequence(partition_id: u16) -> RedisResult<Option<u64>> {
let client = redis::Client::open("redis://127.0.0.1:9090/")?;
let mut con = client.get_connection()?;

let sequence: Option<u64> = redis::cmd("EPSEQ")
.arg(partition_id)
.query(&mut con)?;

Ok(sequence)
}

fn get_partition_sequence_by_key(partition_key: &Uuid) -> RedisResult<Option<u64>> {
let client = redis::Client::open("redis://127.0.0.1:9090/")?;
let mut con = client.get_connection()?;

let sequence: Option<u64> = redis::cmd("EPSEQ")
.arg(partition_key.to_string())
.query(&mut con)?;

Ok(sequence)
}

fn main() -> RedisResult<()> {
// Check by partition ID
match get_partition_sequence(5)? {
Some(seq) => println!("Partition 5 sequence: {}", seq),
None => println!("Partition 5 is empty"),
}

// Check by partition key
let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
match get_partition_sequence_by_key(&partition_key)? {
Some(seq) => println!("Partition key {} sequence: {}", partition_key, seq),
None => println!("Partition is empty"),
}

Ok(())
}

Use Cases

Monitoring Progress

Monitor partition progress across a cluster:

def monitor_cluster_progress(partition_count):
"""Monitor progress across all partitions"""
while True:
total_events = 0
slowest_partition = None
slowest_sequence = float('inf')

print("Partition Progress:")
for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None:
total_events += sequence
if sequence < slowest_sequence:
slowest_sequence = sequence
slowest_partition = partition_id
print(f" Partition {partition_id}: {sequence}")
else:
print(f" Partition {partition_id}: empty")

print(f"Total events: {total_events}")
if slowest_partition is not None:
print(f"Slowest partition: {slowest_partition} (seq {slowest_sequence})")

time.sleep(5)

Checkpoint Management

Track processing checkpoints:

class PartitionCheckpoint:
def __init__(self, partition_id):
self.partition_id = partition_id
self.last_processed_sequence = 0

def get_unprocessed_count(self):
"""Get number of unprocessed events"""
current_sequence = client.execute_command('EPSEQ', self.partition_id)
if current_sequence is None:
return 0
return max(0, current_sequence - self.last_processed_sequence)

def mark_processed(self, sequence):
"""Mark events up to sequence as processed"""
self.last_processed_sequence = sequence

def needs_processing(self):
"""Check if partition has unprocessed events"""
return self.get_unprocessed_count() > 0

# Use checkpoint tracking
checkpoint = PartitionCheckpoint(5)
if checkpoint.needs_processing():
unprocessed = checkpoint.get_unprocessed_count()
print(f"Partition 5 has {unprocessed} unprocessed events")

Load Balancing

Balance work across partitions:

def find_most_active_partition(partition_count):
"""Find partition with most recent activity"""
most_active = None
highest_sequence = -1

for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None and sequence > highest_sequence:
highest_sequence = sequence
most_active = partition_id

return most_active, highest_sequence

def balance_processing_load(partition_count, worker_count):
"""Distribute partitions among workers based on activity"""
partition_sequences = {}

for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
partition_sequences[partition_id] = sequence or 0

# Sort partitions by sequence (activity)
sorted_partitions = sorted(partition_sequences.items(), key=lambda x: x[1], reverse=True)

# Distribute among workers
workers = [[] for _ in range(worker_count)]
worker_loads = [0] * worker_count

for partition_id, sequence in sorted_partitions:
# Assign to worker with lowest current load
worker_idx = worker_loads.index(min(worker_loads))
workers[worker_idx].append(partition_id)
worker_loads[worker_idx] += sequence

return workers

# Balance 8 partitions among 3 workers
workers = balance_processing_load(8, 3)
for i, partitions in enumerate(workers):
print(f"Worker {i}: partitions {partitions}")

Catchup Detection

Detect when a partition is behind:

def detect_lagging_partitions(partition_count, threshold=1000):
"""Detect partitions that are significantly behind"""
sequences = []

for partition_id in range(partition_count):
sequence = client.execute_command('EPSEQ', partition_id)
if sequence is not None:
sequences.append((partition_id, sequence))

if not sequences:
return []

# Calculate median sequence
sorted_sequences = sorted([seq for _, seq in sequences])
median = sorted_sequences[len(sorted_sequences) // 2]

# Find lagging partitions
lagging = []
for partition_id, sequence in sequences:
if median - sequence > threshold:
lagging.append({
'partition_id': partition_id,
'current_sequence': sequence,
'behind_by': median - sequence
})

return lagging

# Check for lagging partitions
lagging = detect_lagging_partitions(32, threshold=500)
if lagging:
print("Lagging partitions detected:")
for info in lagging:
print(f" Partition {info['partition_id']}: {info['behind_by']} events behind")

Performance Notes

  • O(1) operation: Very fast, reads from in-memory metadata
  • No disk I/O: Sequence numbers are cached in memory
  • Cluster-aware: Works across distributed nodes
  • Real-time: Reflects current state immediately

Error Responses

Invalid Partition ID

EPSEQ 99999
-INVALID_PARTITION Partition ID 99999 is out of range (0-31)

Invalid UUID Format

EPSEQ invalid-uuid
-SYNTAX_ERROR Invalid UUID format for partition key

Monitoring Example

Complete monitoring script:

#!/usr/bin/env python3
import redis
import time
import json
from datetime import datetime

class PartitionMonitor:
def __init__(self, host='localhost', port=9090, partition_count=32):
self.client = redis.Redis(host=host, port=port, protocol=3)
self.partition_count = partition_count
self.previous_sequences = {}

def get_all_sequences(self):
"""Get current sequence for all partitions"""
sequences = {}
for partition_id in range(self.partition_count):
seq = self.client.execute_command('EPSEQ', partition_id)
sequences[partition_id] = seq
return sequences

def calculate_rates(self, current_sequences):
"""Calculate event rates per partition"""
rates = {}
for partition_id, current_seq in current_sequences.items():
previous_seq = self.previous_sequences.get(partition_id, 0)
if current_seq is not None and previous_seq is not None:
rates[partition_id] = max(0, current_seq - previous_seq)
else:
rates[partition_id] = 0
return rates

def print_status(self, sequences, rates):
"""Print formatted status"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n=== Partition Status at {timestamp} ===")

total_events = sum(seq for seq in sequences.values() if seq is not None)
total_rate = sum(rates.values())

print(f"Total Events: {total_events:,}")
print(f"Total Rate: {total_rate} events/sec")
print()

print("Partition | Sequence | Rate/sec | Status")
print("-" * 40)

for partition_id in range(self.partition_count):
seq = sequences.get(partition_id)
rate = rates.get(partition_id, 0)

if seq is None:
status = "Empty"
seq_str = "-"
elif rate == 0:
status = "Idle"
seq_str = f"{seq:,}"
else:
status = "Active"
seq_str = f"{seq:,}"

print(f"{partition_id:9} | {seq_str:8} | {rate:8} | {status}")

def monitor(self, interval=1):
"""Start monitoring loop"""
print(f"Starting partition monitor (interval: {interval}s)")
print("Press Ctrl+C to stop")

try:
while True:
current_sequences = self.get_all_sequences()
rates = self.calculate_rates(current_sequences)
self.print_status(current_sequences, rates)

self.previous_sequences = current_sequences
time.sleep(interval)

except KeyboardInterrupt:
print("\nMonitoring stopped")

if __name__ == "__main__":
monitor = PartitionMonitor()
monitor.monitor(interval=5)
  • EPSCAN - Scan events from partition
  • ESVER - Get stream version
  • EAPPEND - Append events that increment sequences