Presence
TL;DR
Presence systems track and broadcast user online status, activity state, and related metadata in real-time. Key challenges include accurate detection (distinguishing disconnection from network issues), scalability (millions of users), consistency (eventual is usually acceptable), and privacy controls. Common implementations use heartbeats, TTL-based expiration, and pub/sub for broadcasting changes.
Presence States
┌─────────────────────────────────────────────────────────────────────┐
│ Presence States │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Online │ │ Away │ │ Busy │ │ Offline │ │
│ │ 🟢 │ │ 🟡 │ │ 🔴 │ │ ⚫ │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ Connected Idle for User-set Disconnected │
│ and active 5 minutes status or timed out │
│ │
│ Extended States: │
│ • In a call 🎧 │
│ • In a meeting 📅 │
│ • Presenting 🖥️ │
│ • Focus mode 🔕 │
│ • Custom status 💬 │
└─────────────────────────────────────────────────────────────────────┘Basic Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Presence System │
│ │
│ Clients Servers Storage │
│ │
│ ┌────────┐ ┌────────────┐ ┌───────────┐ │
│ │Client A│──heartbeat──►│ Presence │───update───►│ Redis │ │
│ │ │◄─presence────│ Server │◄──query─────│ Cluster │ │
│ └────────┘ └─────┬──────┘ └───────────┘ │
│ │ │
│ ┌────────┐ │ │
│ │Client B│──subscribe───────►│ │
│ │ │◄─changes──────────│ │
│ └────────┘ │ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Pub/Sub │ │
│ │ Channel │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘Implementation
Presence Service (Python)
import asyncio
import redis.asyncio as redis
import json
import time
from dataclasses import dataclass, asdict
from typing import Dict, Set, Optional, List
from enum import Enum
class PresenceStatus(Enum):
ONLINE = "online"
AWAY = "away"
BUSY = "busy"
OFFLINE = "offline"
@dataclass
class PresenceInfo:
user_id: str
status: PresenceStatus
last_seen: float
device: str
custom_status: Optional[str] = None
activity: Optional[str] = None # e.g., "In a call"
def to_dict(self) -> dict:
return {
'user_id': self.user_id,
'status': self.status.value,
'last_seen': self.last_seen,
'device': self.device,
'custom_status': self.custom_status,
'activity': self.activity
}
@classmethod
def from_dict(cls, data: dict) -> 'PresenceInfo':
return cls(
user_id=data['user_id'],
status=PresenceStatus(data['status']),
last_seen=data['last_seen'],
device=data['device'],
custom_status=data.get('custom_status'),
activity=data.get('activity')
)
class PresenceService:
"""
Scalable presence service using Redis.
"""
def __init__(
self,
redis_url: str = 'redis://localhost:6379',
heartbeat_ttl: int = 30, # Seconds before considered offline
away_threshold: int = 300, # Seconds of inactivity before away
):
self.redis_url = redis_url
self.redis: redis.Redis = None
self.heartbeat_ttl = heartbeat_ttl
self.away_threshold = away_threshold
# Keys
self.presence_key = "presence:{user_id}"
self.heartbeat_key = "presence:heartbeat:{user_id}"
self.channel_key = "presence:updates"
async def connect(self):
"""Initialize Redis connection."""
self.redis = redis.from_url(self.redis_url)
async def set_online(
self,
user_id: str,
device: str,
custom_status: str = None
) -> PresenceInfo:
"""Mark user as online."""
presence = PresenceInfo(
user_id=user_id,
status=PresenceStatus.ONLINE,
last_seen=time.time(),
device=device,
custom_status=custom_status
)
await self._update_presence(presence)
return presence
async def heartbeat(self, user_id: str, device: str) -> PresenceInfo:
"""
Update heartbeat to maintain online status.
Should be called every 10-15 seconds.
"""
now = time.time()
# Get current presence
current = await self.get_presence(user_id)
if current:
current.last_seen = now
current.device = device
# Check if should transition to away
if current.status == PresenceStatus.ONLINE:
# Activity is tracked separately
pass
else:
current = PresenceInfo(
user_id=user_id,
status=PresenceStatus.ONLINE,
last_seen=now,
device=device
)
await self._update_presence(current)
return current
async def set_status(
self,
user_id: str,
status: PresenceStatus,
custom_status: str = None,
activity: str = None
):
"""Set user status explicitly."""
current = await self.get_presence(user_id)
if current:
old_status = current.status
current.status = status
current.custom_status = custom_status
current.activity = activity
current.last_seen = time.time()
await self._update_presence(current, notify=old_status != status)
async def set_offline(self, user_id: str):
"""Mark user as offline."""
presence = PresenceInfo(
user_id=user_id,
status=PresenceStatus.OFFLINE,
last_seen=time.time(),
device=""
)
# Remove heartbeat key
await self.redis.delete(self.heartbeat_key.format(user_id=user_id))
await self._update_presence(presence)
async def _update_presence(
self,
presence: PresenceInfo,
notify: bool = True
):
"""Update presence in Redis and optionally notify subscribers."""
user_id = presence.user_id
pipe = self.redis.pipeline()
# Store presence data
pipe.hset(
self.presence_key.format(user_id=user_id),
mapping=presence.to_dict()
)
# Set heartbeat with TTL
if presence.status != PresenceStatus.OFFLINE:
pipe.setex(
self.heartbeat_key.format(user_id=user_id),
self.heartbeat_ttl,
"1"
)
await pipe.execute()
# Publish update
if notify:
await self.redis.publish(
self.channel_key,
json.dumps(presence.to_dict())
)
async def get_presence(self, user_id: str) -> Optional[PresenceInfo]:
"""Get user's current presence."""
data = await self.redis.hgetall(
self.presence_key.format(user_id=user_id)
)
if not data:
return None
# Decode bytes
decoded = {k.decode(): v.decode() for k, v in data.items()}
decoded['last_seen'] = float(decoded['last_seen'])
presence = PresenceInfo.from_dict(decoded)
# Check if heartbeat expired (actually offline)
heartbeat_exists = await self.redis.exists(
self.heartbeat_key.format(user_id=user_id)
)
if not heartbeat_exists and presence.status != PresenceStatus.OFFLINE:
presence.status = PresenceStatus.OFFLINE
return presence
async def get_bulk_presence(
self,
user_ids: List[str]
) -> Dict[str, PresenceInfo]:
"""Get presence for multiple users efficiently."""
pipe = self.redis.pipeline()
for user_id in user_ids:
pipe.hgetall(self.presence_key.format(user_id=user_id))
pipe.exists(self.heartbeat_key.format(user_id=user_id))
results = await pipe.execute()
presences = {}
for i, user_id in enumerate(user_ids):
data = results[i * 2]
heartbeat_exists = results[i * 2 + 1]
if data:
decoded = {k.decode(): v.decode() for k, v in data.items()}
decoded['last_seen'] = float(decoded['last_seen'])
presence = PresenceInfo.from_dict(decoded)
if not heartbeat_exists and presence.status != PresenceStatus.OFFLINE:
presence.status = PresenceStatus.OFFLINE
presences[user_id] = presence
return presences
async def subscribe_presence_updates(self):
"""Subscribe to presence change events."""
pubsub = self.redis.pubsub()
await pubsub.subscribe(self.channel_key)
async for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
yield PresenceInfo.from_dict(data)Client Integration
class PresenceClient {
constructor(wsUrl, userId, device) {
this.wsUrl = wsUrl;
this.userId = userId;
this.device = device;
this.ws = null;
this.heartbeatInterval = null;
this.activityTimeout = null;
this.isActive = true;
this.HEARTBEAT_INTERVAL = 15000; // 15 seconds
this.ACTIVITY_TIMEOUT = 300000; // 5 minutes
this.callbacks = {
onPresenceChange: null,
onStatusChange: null
};
}
async connect() {
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
this.setOnline();
this.startHeartbeat();
this.setupActivityDetection();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = () => {
this.stopHeartbeat();
};
}
setOnline() {
this.ws.send(JSON.stringify({
type: 'set_online',
device: this.device
}));
}
setStatus(status, customStatus = null, activity = null) {
this.ws.send(JSON.stringify({
type: 'set_status',
status,
custom_status: customStatus,
activity
}));
}
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'heartbeat',
device: this.device
}));
}
}, this.HEARTBEAT_INTERVAL);
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
}
setupActivityDetection() {
// Track user activity
const activityEvents = ['mousedown', 'keydown', 'scroll', 'touchstart'];
const handleActivity = () => {
if (!this.isActive) {
this.isActive = true;
this.setStatus('online');
}
clearTimeout(this.activityTimeout);
this.activityTimeout = setTimeout(() => {
this.isActive = false;
this.setStatus('away');
}, this.ACTIVITY_TIMEOUT);
};
activityEvents.forEach(event => {
document.addEventListener(event, handleActivity, { passive: true });
});
// Handle visibility change
document.addEventListener('visibilitychange', () => {
if (document.hidden) {
// Tab hidden - might go away soon
this.activityTimeout = setTimeout(() => {
this.isActive = false;
this.setStatus('away');
}, 60000); // 1 minute when tab hidden
} else {
handleActivity();
}
});
}
subscribeToUsers(userIds) {
this.ws.send(JSON.stringify({
type: 'subscribe',
user_ids: userIds
}));
}
handleMessage(message) {
switch (message.type) {
case 'presence_update':
if (this.callbacks.onPresenceChange) {
this.callbacks.onPresenceChange(message.presence);
}
break;
case 'initial_presence':
if (this.callbacks.onPresenceChange) {
Object.values(message.presences).forEach(presence => {
this.callbacks.onPresenceChange(presence);
});
}
break;
}
}
on(event, callback) {
this.callbacks[event] = callback;
}
disconnect() {
this.stopHeartbeat();
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const presence = new PresenceClient('wss://api.example.com/presence', 'user123', 'web');
presence.on('onPresenceChange', (presenceInfo) => {
updateUserStatus(presenceInfo.user_id, presenceInfo.status);
});
await presence.connect();
presence.subscribeToUsers(['friend1', 'friend2', 'friend3']);Scaling Presence
Distributed Presence with Redis Cluster
import hashlib
from typing import List
class ShardedPresenceService:
"""
Presence service sharded across Redis nodes.
Each user's presence is stored on a specific shard.
"""
def __init__(self, redis_nodes: List[str]):
self.shards = [
redis.from_url(node)
for node in redis_nodes
]
self.shard_count = len(self.shards)
def _get_shard(self, user_id: str) -> redis.Redis:
"""Consistent hashing to select shard."""
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_val % self.shard_count
return self.shards[shard_index]
async def set_presence(self, user_id: str, presence: PresenceInfo):
shard = self._get_shard(user_id)
await shard.hset(
f"presence:{user_id}",
mapping=presence.to_dict()
)
async def get_bulk_presence(
self,
user_ids: List[str]
) -> Dict[str, PresenceInfo]:
"""Get presence across shards."""
# Group users by shard
shard_users: Dict[int, List[str]] = {}
for user_id in user_ids:
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_idx = hash_val % self.shard_count
if shard_idx not in shard_users:
shard_users[shard_idx] = []
shard_users[shard_idx].append(user_id)
# Query each shard in parallel
async def query_shard(shard_idx: int, users: List[str]):
shard = self.shards[shard_idx]
pipe = shard.pipeline()
for user_id in users:
pipe.hgetall(f"presence:{user_id}")
results = await pipe.execute()
return {
user_id: self._parse_presence(result)
for user_id, result in zip(users, results)
if result
}
tasks = [
query_shard(shard_idx, users)
for shard_idx, users in shard_users.items()
]
shard_results = await asyncio.gather(*tasks)
# Merge results
all_presences = {}
for result in shard_results:
all_presences.update(result)
return all_presencesPresence Fan-Out Optimization
class PresenceFanOutService:
"""
Optimize presence updates for users with many subscribers.
Instead of publishing to every subscriber, use channels.
"""
def __init__(self, redis: redis.Redis):
self.redis = redis
# Threshold for using channel-based delivery
self.channel_threshold = 1000
async def subscribe_to_user(
self,
subscriber_id: str,
target_user_id: str
):
"""Subscribe to a user's presence updates."""
# Track subscription
await self.redis.sadd(
f"presence:subscribers:{target_user_id}",
subscriber_id
)
# Also subscribe to user's presence channel
# (for when they have many subscribers)
await self.redis.sadd(
f"presence:subscribed:{subscriber_id}",
target_user_id
)
async def notify_presence_change(self, presence: PresenceInfo):
"""Notify subscribers of presence change."""
user_id = presence.user_id
# Check subscriber count
subscriber_count = await self.redis.scard(
f"presence:subscribers:{user_id}"
)
if subscriber_count > self.channel_threshold:
# Many subscribers - use pub/sub channel
await self.redis.publish(
f"presence:channel:{user_id}",
json.dumps(presence.to_dict())
)
else:
# Few subscribers - direct notification
subscribers = await self.redis.smembers(
f"presence:subscribers:{user_id}"
)
for subscriber_id in subscribers:
await self.redis.lpush(
f"presence:updates:{subscriber_id.decode()}",
json.dumps(presence.to_dict())
)Presence in Group Contexts
class ChannelPresenceService:
"""
Track presence in channels/rooms/groups.
Who is currently viewing a document, in a chat room, etc.
"""
def __init__(self, redis: redis.Redis):
self.redis = redis
self.ttl = 60 # Presence TTL in channel
async def join_channel(
self,
channel_id: str,
user_id: str,
metadata: dict = None
):
"""User joins a channel."""
now = time.time()
member_data = json.dumps({
'user_id': user_id,
'joined_at': now,
'last_seen': now,
**(metadata or {})
})
pipe = self.redis.pipeline()
# Add to channel members (sorted set by last_seen)
pipe.zadd(
f"channel:{channel_id}:members",
{member_data: now}
)
# Set TTL key for cleanup
pipe.setex(
f"channel:{channel_id}:member:{user_id}",
self.ttl,
"1"
)
await pipe.execute()
# Notify channel
await self.redis.publish(
f"channel:{channel_id}:presence",
json.dumps({
'type': 'joined',
'user_id': user_id,
'metadata': metadata
})
)
async def heartbeat_channel(self, channel_id: str, user_id: str):
"""Update presence in channel."""
await self.redis.setex(
f"channel:{channel_id}:member:{user_id}",
self.ttl,
"1"
)
async def leave_channel(self, channel_id: str, user_id: str):
"""User leaves channel."""
pipe = self.redis.pipeline()
pipe.delete(f"channel:{channel_id}:member:{user_id}")
# Remove from sorted set (need to scan for user_id)
# In practice, use a hash instead for easier lookup
await pipe.execute()
await self.redis.publish(
f"channel:{channel_id}:presence",
json.dumps({
'type': 'left',
'user_id': user_id
})
)
async def get_channel_members(self, channel_id: str) -> List[dict]:
"""Get active members in channel."""
# Get all members
members = await self.redis.zrange(
f"channel:{channel_id}:members",
0, -1
)
active_members = []
for member_data in members:
data = json.loads(member_data)
user_id = data['user_id']
# Check if still active (TTL key exists)
is_active = await self.redis.exists(
f"channel:{channel_id}:member:{user_id}"
)
if is_active:
active_members.append(data)
return active_members
async def get_member_count(self, channel_id: str) -> int:
"""Get active member count (approximate)."""
# Count TTL keys matching pattern
cursor = 0
count = 0
while True:
cursor, keys = await self.redis.scan(
cursor,
match=f"channel:{channel_id}:member:*",
count=100
)
count += len(keys)
if cursor == 0:
break
return count
# Real-time document collaboration
class DocumentPresence:
"""Track who is viewing/editing a document."""
def __init__(self, channel_presence: ChannelPresenceService):
self.presence = channel_presence
async def start_viewing(self, doc_id: str, user_id: str, cursor: dict = None):
await self.presence.join_channel(
f"doc:{doc_id}",
user_id,
{'cursor': cursor, 'mode': 'viewing'}
)
async def start_editing(self, doc_id: str, user_id: str, cursor: dict):
await self.presence.join_channel(
f"doc:{doc_id}",
user_id,
{'cursor': cursor, 'mode': 'editing'}
)
async def update_cursor(self, doc_id: str, user_id: str, cursor: dict):
await self.redis.publish(
f"doc:{doc_id}:cursors",
json.dumps({
'user_id': user_id,
'cursor': cursor
})
)Privacy Controls
from enum import Enum
from typing import Optional
class PresenceVisibility(Enum):
EVERYONE = "everyone"
CONTACTS_ONLY = "contacts"
NOBODY = "nobody"
class PrivatePresenceService(PresenceService):
"""Presence with privacy controls."""
async def get_presence(
self,
requester_id: str,
target_user_id: str
) -> Optional[PresenceInfo]:
"""Get presence with privacy check."""
# Get target's privacy settings
visibility = await self._get_visibility_setting(target_user_id)
if visibility == PresenceVisibility.NOBODY:
return None
if visibility == PresenceVisibility.CONTACTS_ONLY:
is_contact = await self._is_contact(target_user_id, requester_id)
if not is_contact:
return None
# Privacy check passed, get presence
return await super().get_presence(target_user_id)
async def get_bulk_presence(
self,
requester_id: str,
user_ids: List[str]
) -> Dict[str, PresenceInfo]:
"""Bulk presence with privacy filtering."""
# Get all visibility settings
settings = await self._get_bulk_visibility(user_ids)
# Filter based on privacy
allowed_users = []
contacts_check_users = []
for user_id in user_ids:
visibility = settings.get(user_id, PresenceVisibility.EVERYONE)
if visibility == PresenceVisibility.EVERYONE:
allowed_users.append(user_id)
elif visibility == PresenceVisibility.CONTACTS_ONLY:
contacts_check_users.append(user_id)
# Check contacts for those requiring it
if contacts_check_users:
contacts = await self._get_mutual_contacts(
requester_id,
contacts_check_users
)
allowed_users.extend(contacts)
# Get presence for allowed users only
return await super().get_bulk_presence(allowed_users)
async def set_visibility(
self,
user_id: str,
visibility: PresenceVisibility
):
"""Set user's presence visibility."""
await self.redis.hset(
f"user:{user_id}:settings",
"presence_visibility",
visibility.value
)Key Takeaways
Heartbeats with TTL: Use time-limited keys to automatically detect offline users without explicit disconnect
Eventual consistency is OK: Presence doesn't need strong consistency; a few seconds delay is acceptable
Optimize for reads: Presence is read-heavy; denormalize and cache for efficient bulk lookups
Channel-based fan-out: For popular users, use pub/sub channels instead of individual notifications
Activity detection: Combine heartbeats with actual user activity (mouse, keyboard) for accurate away status
Privacy by default: Implement visibility controls; not everyone should see everyone's status
Context-specific presence: Track presence in channels/rooms for collaborative features, not just global status