Real-Time Collaborative Editing with CRDTs and AI — Sync Design That Resolves Conflicts with Math
Starting from a sync bug that silently erased user data, this deep dive covers CRDT-based conflict-free sync design: choosing between Yjs and Automerge, compaction against tombstone bloat, and keeping AI conflict resolution out of the sync path.
I once shipped multi-device sync for a favorites list using the simplest possible rule: the last device to save wins. This was in one of the wallpaper apps I build and run as an indie developer.
On paper, the design looked harmless. In practice, a user would curate their favorites offline on the train, reconnect to Wi-Fi at home, and watch the app silently overwrite everything they had added on another device. By the time the reports surfaced in reviews, the data was gone for good. Unlike a crash, a sync bug like this barely leaves a trace in the logs. I still remember the cold sweat.
That painful lesson is what pushed me to study CRDTs (Conflict-free Replicated Data Types) seriously. The "edit together without breaking anything" experience that Google Docs, Figma, and Notion make look effortless is not magic — it is mathematics. By making operations commutative and idempotent, every replica converges to the same state without a central arbiter. Once that property clicks, you never look at sync design the same way again.
In the sections that follow, we will work through how CRDTs make collaborative editing possible, a production-grade implementation built around Yjs, how to choose between Yjs and Automerge, the document-bloat problem you will inevitably hit in operation, and the design decisions involved in adding AI to the merge process — all with working code.
Understanding CRDTs: The Foundation
The Core Problem CRDTs Solve
In traditional client-server systems, a single server coordinates all changes. This works until you need offline functionality, P2P collaboration, or low-latency experiences. The moment you remove the server's coordinating power, you face the fundamental challenge: how do you merge concurrent edits?
User A: "Hello" → edits at position 0, inserts "Hi" → "HiHello"
User B: "Hello" → edits at position 0, inserts "Hey" → "HeyHello"
Question: When both changes sync, what's the final state?
Traditional approach: Server decides (requires round-trip)
CRDT approach: Mathematical rules ensure convergence
CRDTs solve this by making operations commutative and idempotent. The order operations arrive doesn't matter—the final state is always identical, regardless of network delays or concurrent edits.
CRDT Types and Their Trade-offs
┌─────────────────────────────────────────────────────────┐
│ OPERATION-BASED (Commutative Replicated Data Types) │
├─────────────────────────────────────────────────────────┤
│ Broadcast operations → operations automatically │
│ commute/compose │
│ Best for: Counter, Set, Flag operations │
│ Network overhead: Low │
│ Complexity: Medium │
│ Example: Increment counter by 5 (order doesn't matter) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ STATE-BASED (Commutative Replicated Data Types) │
├─────────────────────────────────────────────────────────┤
│ Broadcast state → merge happens at destination │
│ Best for: Complex documents, trees │
│ Network overhead: High │
│ Complexity: Lower for simple merges │
│ Example: Last-write-wins (but loses data) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ SEQUENCE CRDTs (WOOT, RGA, Yjs) │
├─────────────────────────────────────────────────────────┤
│ Specialized for text/ordered sequences │
│ Assign unique IDs to each character │
│ Best for: Text editors, lists │
│ Network overhead: Very high initially │
│ Complexity: High │
│ Example: Every character gets (siteID, clock) ID │
└─────────────────────────────────────────────────────────┘
💡
**Choosing a CRDT:** For text editing, use sequence CRDTs (Yjs, Automerge). For application state (user presence, cursors, selections), use operation-based CRDTs. Hybrid approaches combine both.
Implementing Yjs: The Production CRDT
Yjs is the de facto standard for collaborative text editing. It's used by Notion, Figma, and dozens of others. Understanding Yjs teaches the patterns applicable to any CRDT.
Core Architecture
# crdt/yjs_bridge.py"""Bridge between Python backend and Yjs (JavaScript CRDT).In production, Yjs runs on client; we sync to server storage."""from typing import Any, Dict, List, Optionalfrom dataclasses import dataclass, fieldfrom datetime import datetimeimport json@dataclassclass YjsUpdate: """Represents a change in a Yjs document""" origin_client_id: str clock: int # Lamport clock for ordering content: bytes # Encoded Yjs update timestamp: datetime = field(default_factory=datetime.utcnow) parent_clock: int = 0 # For causal ordering def to_dict(self) -> Dict[str, Any]: return { "origin_client_id": self.origin_client_id, "clock": self.clock, "content": self.content.hex(), # Base64 for JSON "timestamp": self.timestamp.isoformat(), "parent_clock": self.parent_clock }@dataclassclass CollaborativeDocument: """A document managed by CRDT synchronization""" doc_id: str client_id: str updates: List[YjsUpdate] = field(default_factory=list) vector_clock: Dict[str, int] = field(default_factory=dict) last_synced: Optional[datetime] = None def add_update(self, update: YjsUpdate) -> None: """Add a local change to the document""" self.updates.append(update) # Update vector clock for this client self.vector_clock[self.client_id] = ( self.vector_clock.get(self.client_id, 0) + 1 ) def get_missing_updates( self, remote_vector_clock: Dict[str, int] ) -> List[YjsUpdate]: """Determine which updates haven't been seen by remote client""" missing = [] for update in self.updates: remote_clock = remote_vector_clock.get( update.origin_client_id, 0 ) if update.clock > remote_clock: missing.append(update) return missing def merge_remote_update(self, update: YjsUpdate) -> None: """Incorporate a change from another client""" # Insert update in clock order insert_pos = len(self.updates) for i, existing in enumerate(self.updates): if existing.clock > update.clock: insert_pos = i break self.updates.insert(insert_pos, update) # Update our vector clock self.vector_clock[update.origin_client_id] = max( self.vector_clock.get(update.origin_client_id, 0), update.clock )# Document synchronization protocolclass CRDTSyncManager: """Manages CRDT document synchronization across clients""" def __init__(self, storage_backend, max_batch_size: int = 100): self.storage = storage_backend self.max_batch_size = max_batch_size self.documents: Dict[str, CollaborativeDocument] = {} async def sync_document( self, doc_id: str, client_id: str, local_updates: List[YjsUpdate], remote_vector_clock: Dict[str, int] ) -> Dict[str, Any]: """ Sync a document between client and server. The server acts as a "causal sequencer" but NOT a coordinator. All merge decisions are deterministic based on vector clocks. """ doc = self.documents.get(doc_id) if not doc: doc = CollaborativeDocument(doc_id=doc_id, client_id=client_id) self.documents[doc_id] = doc # Merge incoming updates for update in local_updates: doc.merge_remote_update(update) # Find updates to send back missing_updates = doc.get_missing_updates(remote_vector_clock) # Batch for efficiency update_batches = [ missing_updates[i:i + self.max_batch_size] for i in range(0, len(missing_updates), self.max_batch_size) ] # Persist to storage await self.storage.save_document(doc) return { "doc_id": doc_id, "acknowledged": len(local_updates), "updates_to_apply": update_batches, "server_vector_clock": doc.vector_clock, "needs_full_sync": self._should_trigger_full_sync(doc) } def _should_trigger_full_sync(self, doc: CollaborativeDocument) -> bool: """Detect situations requiring full document state""" # If document has grown very large, send state instead of ops total_update_size = sum( len(u.content) for u in doc.updates ) return total_update_size > 10_000_000 # 10MB threshold
⚠️
**Storage Implications:** Storing raw CRDT updates forever bloats storage. Implement "snapshots"—periodically save the document's complete state, discard old updates. Example: snapshot every 1000 updates or daily.
Real-time Synchronization Protocol
# sync/websocket_handler.pyimport asynciofrom typing import Setfrom fastapi import WebSocket, WebSocketDisconnectimport jsonclass CollaborativeDocumentManager: """WebSocket handler for real-time document synchronization""" def __init__(self, crdt_sync: CRDTSyncManager): self.crdt_sync = crdt_sync # Room -> Set of connected clients self.active_connections: Dict[str, Set[WebSocket]] = {} # Track which client is connected to which room self.client_rooms: Dict[WebSocket, str] = {} async def connect(self, websocket: WebSocket, doc_id: str, client_id: str): """Client joins document editing session""" await websocket.accept() if doc_id not in self.active_connections: self.active_connections[doc_id] = set() self.active_connections[doc_id].add(websocket) self.client_rooms[websocket] = doc_id # Send initial document state doc = self.crdt_sync.documents.get(doc_id) await websocket.send_json({ "type": "initial_state", "updates": [u.to_dict() for u in (doc.updates if doc else [])], "vector_clock": doc.vector_clock if doc else {} }) async def handle_client_updates( self, websocket: WebSocket, doc_id: str, client_id: str, updates: List[Dict[str, Any]] ): """Process updates from a client and broadcast to others""" # Convert JSON back to YjsUpdate objects yjs_updates = [ YjsUpdate( origin_client_id=u["origin_client_id"], clock=u["clock"], content=bytes.fromhex(u["content"]), parent_clock=u.get("parent_clock", 0) ) for u in updates ] # Sync with server sync_result = await self.crdt_sync.sync_document( doc_id=doc_id, client_id=client_id, local_updates=yjs_updates, remote_vector_clock={} ) # Broadcast to other clients await self.broadcast_updates( doc_id, exclude_client=websocket, updates=yjs_updates, server_vector_clock=sync_result["server_vector_clock"] ) # Acknowledge receipt await websocket.send_json({ "type": "ack", "acknowledged_count": sync_result["acknowledged"], "server_vector_clock": sync_result["server_vector_clock"] }) async def broadcast_updates( self, doc_id: str, exclude_client: WebSocket, updates: List[YjsUpdate], server_vector_clock: Dict[str, int] ): """Send updates to all connected clients""" if doc_id not in self.active_connections: return disconnected = set() for client in self.active_connections[doc_id]: if client == exclude_client: continue try: await client.send_json({ "type": "remote_updates", "updates": [u.to_dict() for u in updates], "server_vector_clock": server_vector_clock }) except Exception: disconnected.add(client) # Clean up disconnected clients for client in disconnected: await self.disconnect(client) async def disconnect(self, websocket: WebSocket): """Client disconnects from session""" doc_id = self.client_rooms.pop(websocket, None) if doc_id and doc_id in self.active_connections: self.active_connections[doc_id].discard(websocket) if not self.active_connections[doc_id]: del self.active_connections[doc_id]
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦A field-tested framework for choosing between Yjs and Automerge — payload size, document growth, and ecosystem
✦A snapshot-compaction implementation that keeps tombstone-driven document bloat in check (with SQLite persistence code)
✦Three design decisions for keeping AI conflict resolution out of the sync path — latency, cost, and fallback
Secure payment via Stripe · Cancel anytime
✦
Unlock This Article
Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.
Choosing Between Yjs and Automerge — My Selection Criteria
When you bring CRDTs into a real project, the first fork in the road is whether to adopt Yjs or Automerge. Both are excellent libraries, but they embody different design philosophies, so the question is not "which one is better" but "which one fits your requirements."
When I ran my own comparison, I used three axes: update payload and document size, history handling, and ecosystem. Let me walk through each.
Axis 1: Measure update and document size yourself
Rather than trusting catalog numbers, measure with a scenario that resembles your app's actual editing pattern. What this measurement answers is: "after a year in production, what happens to storage size and load time?"
// bench/measure-doc-size.mjsimport * as Y from 'yjs'const doc = new Y.Doc()const text = doc.getText('content')// Insert 100k characters one at a time to simulate a real editing sessionfor (let i = 0; i < 100_000; i++) { text.insert(i, 'a')}const snapshot = Y.encodeStateAsUpdate(doc)console.log(`Text length: ${text.length.toLocaleString()} chars`)console.log(`Snapshot: ${(snapshot.byteLength / 1024).toFixed(1)} KB`)
I insert characters one at a time because paste-heavy benchmarks hide the CRDT's metadata cost. In my own tests, for append-heavy scenarios the Yjs snapshot stayed within the same order of magnitude as the raw text. As soon as random-position inserts and deletes enter the mix, the per-character ID bookkeeping and deletion residue (the tombstones we will discuss shortly) push the metadata ratio up. Which pattern your app leans toward changes your storage assumptions.
Axis 2: How much history do you need to keep?
Automerge's defining trait is that the document itself retains the full operation history. Time travel — reconstructing the state at any point — works out of the box, which makes it a strong fit when "who changed what, and when" is an audit requirement. The trade-off is that the stored size tends to be larger for the same amount of editing, although the columnar encoding in the 2.x line improved this dramatically.
Yjs takes the opposite stance: it keeps only what convergence requires and prioritizes staying lightweight. If you need history, you store snapshots yourself.
Axis 3: Ecosystem depth
If collaborative text editing is your main goal, this axis often becomes the practical deciding factor. Yjs ships sync and persistence providers like y-websocket and y-indexeddb, plus bindings for the major editors — ProseMirror, CodeMirror, Monaco. Whether you can avoid hand-rolling the editor integration translates directly into engineering time. On the Automerge side, automerge-repo has matured the networking story considerably, but Yjs still has the edge in editor bindings.
💡
**How I split the decision:** Yjs when collaborative text editing is the core feature. Automerge when audit history and point-in-time recovery are requirements. For small app state (settings, presence), hand-rolling a simple operation-based CRDT is also a realistic option.
Tombstones and Document Bloat — The First Wall You Hit in Production
There is a problem that textbooks rarely mention but production hits you with almost immediately: document bloat.
CRDTs never physically delete anything. Deleted characters leave their ID metadata behind as tombstones so that late-arriving operations can still be ordered correctly. In other words, every time a user wipes the document and rewrites it, the visible text may be empty while the internal metadata keeps growing. On top of that, the naive persistence strategy of appending updates forever means each document open applies more updates than the last, and load time quietly degrades.
My approach is two layers: let Yjs's built-in garbage collection (enabled by default) do what it can, and add a compaction layer that periodically squashes the update log into a single snapshot.
// persistence/compaction.mjsimport * as Y from 'yjs'import Database from 'better-sqlite3'const COMPACT_THRESHOLD = 500 // squash once this many updates accumulateexport function appendUpdate(db, docId, update) { db.prepare( 'INSERT INTO updates (doc_id, payload) VALUES (?, ?)' ).run(docId, Buffer.from(update)) const { count } = db.prepare( 'SELECT COUNT(*) AS count FROM updates WHERE doc_id = ?' ).get(docId) if (count >= COMPACT_THRESHOLD) compact(db, docId)}function compact(db, docId) { const rows = db.prepare( 'SELECT payload FROM updates WHERE doc_id = ? ORDER BY id' ).all(docId) // Squash every update into a single snapshot const merged = Y.mergeUpdates(rows.map((r) => new Uint8Array(r.payload))) const tx = db.transaction(() => { db.prepare('DELETE FROM updates WHERE doc_id = ?').run(docId) db.prepare( 'INSERT INTO updates (doc_id, payload) VALUES (?, ?)' ).run(docId, Buffer.from(merged)) }) tx()}
I use Y.mergeUpdates because it squashes the log without loading the document into a Y.Doc, which keeps the operation cheap. The delete and insert are wrapped in a transaction so a crash mid-compaction cannot lose the log. The threshold of 500 is where my own deployment settled: the number of updates applied on document open stays within a stable range, and load time stops drifting. If your documents see heavier editing, a smaller threshold is perfectly reasonable.
One caveat. Y.mergeUpdates consolidates the log but does not reclaim tombstones. If you want to go further, a periodic batch job can load the document into a Y.Doc and re-encode it with Y.encodeStateAsUpdate, producing a snapshot with garbage collection applied. Be careful, though: references to older states (history features, delta exchange with clients that have not caught up) can break, so I only run this when I can confirm every client is up to date.
AI-Powered Collaborative Features
Intelligent Conflict Resolution
While CRDTs guarantee eventual consistency, AI can select the "best" resolution from multiple mathematically valid options:
# ai/conflict_resolution.pyfrom enum import Enumfrom typing import Tupleimport hashlibclass ConflictType(Enum): TEXT_INSERTION = "text_insertion" TEXT_DELETION = "text_deletion" FORMATTING_CHANGE = "formatting_change" STRUCTURAL_CHANGE = "structural_change"class AIConflictResolver: """Uses AI to intelligently resolve CRDT conflicts""" def __init__(self, llm, embedding_model): self.llm = llm self.embeddings = embedding_model self.conflict_history = [] async def resolve_conflict( self, conflict_type: ConflictType, user_a_change: Dict[str, Any], user_b_change: Dict[str, Any], document_context: str, user_a_intent: Optional[str] = None, user_b_intent: Optional[str] = None ) -> Tuple[Dict[str, Any], str]: """ Intelligently resolve conflicts using LLM reasoning. CRDTs guarantee convergence, but don't understand user intent. AI understands intent and can select mathematically valid but semantically superior resolutions. """ # If we have explicit intents (from user feedback), use them if user_a_intent and user_b_intent: resolution, reason = await self._resolve_by_intent( user_a_change, user_b_change, user_a_intent, user_b_intent, document_context ) return resolution, reason # Otherwise, use semantic analysis if conflict_type == ConflictType.TEXT_INSERTION: return await self._resolve_text_conflict( user_a_change, user_b_change, document_context ) elif conflict_type == ConflictType.FORMATTING_CHANGE: return await self._resolve_formatting_conflict( user_a_change, user_b_change ) async def _resolve_by_intent( self, change_a: Dict[str, Any], change_b: Dict[str, Any], intent_a: str, intent_b: str, context: str ) -> Tuple[Dict[str, Any], str]: """Use LLM to determine which intent better fits document context""" prompt = f"""Document context: {context}User A wants to: {intent_a}User A's change: {json.dumps(change_a)}User B wants to: {intent_b}User B's change: {json.dumps(change_b)}Which change better achieves the document's goals?Respond in JSON: {{"choice": "a" or "b", "reason": "explanation"}}""" response = await self.llm.complete(prompt) decision = json.loads(response) chosen_change = change_a if decision["choice"] == "a" else change_b return chosen_change, decision["reason"] async def _resolve_text_conflict( self, insert_a: Dict[str, Any], insert_b: Dict[str, Any], document_context: str ) -> Tuple[Dict[str, Any], str]: """Resolve conflicting text insertions using semantic coherence""" text_a = insert_a.get("text", "") text_b = insert_b.get("text", "") # Use embeddings to measure semantic similarity embed_a = await self.embeddings.embed(text_a) embed_b = await self.embeddings.embed(text_b) context_embed = await self.embeddings.embed(document_context) # Find which text is more semantically relevant to document similarity_a = cosine_similarity(embed_a, context_embed) similarity_b = cosine_similarity(embed_b, context_embed) if similarity_a > similarity_b: return insert_a, f"Text '{text_a}' more semantically relevant" else: return insert_b, f"Text '{text_b}' more semantically relevant" async def _resolve_formatting_conflict( self, format_a: Dict[str, Any], format_b: Dict[str, Any] ) -> Tuple[Dict[str, Any], str]: """Choose formatting based on consistency with document style""" # Analyze document's existing formatting patterns dominant_style = await self._analyze_document_style() # Score each proposed formatting against dominant style score_a = self._score_format_compatibility(format_a, dominant_style) score_b = self._score_format_compatibility(format_b, dominant_style) if score_a > score_b: return format_a, f"Consistent with document style (score: {score_a})" else: return format_b, f"Consistent with document style (score: {score_b})"
Real-time AI Suggestions
While users edit collaboratively, AI can provide intelligent suggestions without interrupting the flow:
# ai/suggestion_engine.pyfrom typing import AsyncGeneratorimport asyncioclass CollaborativeEditSuggestions: """Generates AI suggestions during collaborative editing""" def __init__(self, llm, rag_engine, rate_limiter): self.llm = llm self.rag = rag_engine # For knowledge base retrieval self.rate_limiter = rate_limiter self.suggestion_cache = {} async def stream_suggestions( self, current_text: str, document_context: str, user_role: str, editing_history: List[Dict[str, Any]] ) -> AsyncGenerator[Dict[str, Any], None]: """ Stream suggestions as user edits. Uses debouncing and caching to avoid overwhelming network. """ # Debounce: wait for editing to pause await asyncio.sleep(0.5) # Check cache cache_key = hashlib.md5( current_text.encode() ).hexdigest() if cache_key in self.suggestion_cache: for suggestion in self.suggestion_cache[cache_key]: yield suggestion return suggestions = [] # Rate limit to prevent excessive LLM calls if not await self.rate_limiter.allow_request(f"suggestions_{user_role}"): return # Parallel suggestion generation tasks = [ self._generate_grammar_suggestions(current_text), self._generate_clarity_suggestions(current_text, document_context), self._generate_continuation_suggestions(current_text, editing_history), self._generate_reference_suggestions(current_text, document_context), ] for coro in asyncio.as_completed(tasks): suggestion = await coro if suggestion: suggestions.append(suggestion) yield suggestion # Cache for future identical edits self.suggestion_cache[cache_key] = suggestions async def _generate_grammar_suggestions( self, text: str ) -> Optional[Dict[str, Any]]: """Suggest grammar and style improvements""" prompt = f"""Review this text for grammar, clarity, and style issues:"{text}"Respond in JSON format:{{ "issues": [ {{"position": 0, "issue": "description", "suggestion": "fix", "severity": "low|medium|high"}} ]}}Only include real issues, no false positives.""" response = await self.llm.complete(prompt) issues = json.loads(response).get("issues", []) if issues: return { "type": "grammar", "issues": issues, "priority": 1 } async def _generate_clarity_suggestions( self, text: str, document_context: str ) -> Optional[Dict[str, Any]]: """Suggest improvements for clarity and brevity""" prompt = f"""Document context: {document_context}Current text: "{text}"Can this be clearer, shorter, or more direct?Respond in JSON:{{ "suggestions": [ {{"original": "phrase", "improved": "better version", "reason": "why"}} ]}}""" response = await self.llm.complete(prompt) suggestions = json.loads(response).get("suggestions", []) if suggestions: return { "type": "clarity", "suggestions": suggestions, "priority": 2 } async def _generate_continuation_suggestions( self, text: str, editing_history: List[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """Suggest what to write next based on patterns""" # Analyze what the user typically writes after similar phrases similar_patterns = self._find_similar_patterns(text, editing_history) if not similar_patterns: return None # Generate continuations based on patterns prompt = f"""Based on previous writing patterns similar to "{text}", what should be written next?Previous continuations: {json.dumps(similar_patterns)}Suggest 3 natural continuations.Respond in JSON: {{"continuations": ["suggestion 1", "suggestion 2", "suggestion 3"]}}""" response = await self.llm.complete(prompt) continuations = json.loads(response).get("continuations", []) if continuations: return { "type": "continuation", "suggestions": continuations, "priority": 3 } async def _generate_reference_suggestions( self, text: str, document_context: str ) -> Optional[Dict[str, Any]]: """Suggest relevant references from knowledge base""" # Retrieve relevant documents relevant_docs = await self.rag.retrieve(text, top_k=3) if not relevant_docs: return None return { "type": "reference", "references": [ { "title": doc["title"], "url": doc["url"], "snippet": doc["snippet"], "relevance": doc["relevance_score"] } for doc in relevant_docs ], "priority": 4 } def _find_similar_patterns( self, current_phrase: str, history: List[Dict[str, Any]] ) -> List[str]: """Find similar editing patterns in user history""" similar = [] for entry in history: if self._phrases_similar(current_phrase, entry.get("text", "")): if entry.get("next_text"): similar.append(entry["next_text"]) return similar[-5:] # Last 5 similar patterns def _phrases_similar(self, phrase1: str, phrase2: str) -> bool: """Check if phrases are semantically similar""" # Simple implementation: check word overlap words1 = set(phrase1.lower().split()) words2 = set(phrase2.lower().split()) overlap = len(words1 & words2) / max(len(words1), len(words2)) return overlap > 0.5
Presence and Awareness Features
Collaborative editing requires awareness of other users in real-time:
# presence/awareness_engine.pyfrom enum import Enumfrom typing import Dict, Setfrom dataclasses import dataclassclass PresenceState(Enum): IDLE = "idle" EDITING = "editing" VIEWING = "viewing" OFFLINE = "offline"@dataclassclass UserPresence: """Represents a user's current state in a document""" client_id: str user_id: str user_name: str color: str # For UI cursor color cursor_position: int selection_start: int selection_end: int state: PresenceState last_activity: datetimeclass AwarenessEngine: """Manages user presence and awareness in collaborative documents""" def __init__(self): # doc_id -> Set of UserPresence self.doc_presences: Dict[str, Dict[str, UserPresence]] = {} def update_presence( self, doc_id: str, client_id: str, presence: UserPresence ) -> Dict[str, UserPresence]: """Update a user's presence and return all presences in doc""" if doc_id not in self.doc_presences: self.doc_presences[doc_id] = {} self.doc_presences[doc_id][client_id] = presence return self.doc_presences[doc_id] def get_active_users( self, doc_id: str, exclude_client: Optional[str] = None ) -> List[UserPresence]: """Get all active users except specified client""" if doc_id not in self.doc_presences: return [] users = [ presence for presence in self.doc_presences[doc_id].values() if presence.state != PresenceState.OFFLINE ] if exclude_client: users = [u for u in users if u.client_id != exclude_client] return users def get_awareness_info( self, doc_id: str, for_client: str ) -> Dict[str, Any]: """Get awareness information to send to a client""" active_users = self.get_active_users(doc_id, exclude_client=for_client) return { "active_users": [ { "client_id": u.client_id, "user_name": u.user_name, "color": u.color, "cursor_position": u.cursor_position, "selection": { "start": u.selection_start, "end": u.selection_end }, "state": u.state.value, "last_activity": u.last_activity.isoformat() } for u in active_users ] } async def handle_inactivity( self, doc_id: str, timeout_seconds: int = 300 ): """Mark inactive users as offline""" if doc_id not in self.doc_presences: return now = datetime.utcnow() inactive_clients = [] for client_id, presence in self.doc_presences[doc_id].items(): elapsed = (now - presence.last_activity).total_seconds() if elapsed > timeout_seconds: presence.state = PresenceState.OFFLINE inactive_clients.append(client_id) # Clean up offline clients after extended period if inactive_clients and elapsed > timeout_seconds * 2: for client_id in inactive_clients: del self.doc_presences[doc_id][client_id]
Keeping AI Conflict Resolution Out of the Sync Path — Three Design Decisions
Before wiring AI conflict resolution like the above into production, there are decisions worth locking in first. These are the three I settled on.
1. Sync stays sync; AI follows behind. A CRDT merge completes in tens of milliseconds, while an LLM call takes hundreds of milliseconds to several seconds. Putting the AI's judgment inside the sync path throws away the instant responsiveness that makes CRDTs attractive in the first place. So the CRDT merge result is applied immediately, and the AI runs in an asynchronous review lane that detects semantic clashes and proposes fixes after the fact. From the user's perspective, edits sync instantly, and a few seconds later a suggestion arrives: "two people's intents collided in this paragraph."
2. Narrow the escalation criteria. Calling an LLM for every concurrent edit is not economically realistic. I use heuristics to detect only the patterns where intent collisions are likely — simultaneous edits to the same paragraph, overlapping deletes and inserts — and hand just those to the AI. In my deployment, this pre-filter kept LLM calls down to a small fraction of all concurrent-edit events.
3. The AI can fail without breaking convergence. LLM APIs go down. Rate limits happen. A design where sync stalls because the AI lane stalled defeats the whole purpose. Fix the invariant first: even if the AI lane is completely dead, the document stays correct on CRDT convergence alone. The AI is an enhancement layer you can unplug, not a dependency.
Settle these three up front and AI integration stops being a risky modification and becomes an enhancement you can remove without consequences. The more constrained your operational resources — and as a solo developer, mine certainly are — the more this separation pays off.
# tests/test_crdt.pyimport pytestfrom concurrent.futures import ThreadPoolExecutor@pytest.mark.asyncioasync def test_concurrent_inserts_converge(): """Test that concurrent text inserts converge to same state""" doc1 = CollaborativeDocument("doc1", "client1") doc2 = CollaborativeDocument("doc1", "client2") # Create concurrent updates update1 = YjsUpdate( origin_client_id="client1", clock=1, content=b"insert_at_0_hello" ) update2 = YjsUpdate( origin_client_id="client2", clock=1, content=b"insert_at_0_world" ) # Apply in different orders doc1.merge_remote_update(update1) doc1.merge_remote_update(update2) doc2.merge_remote_update(update2) doc2.merge_remote_update(update1) # Both should have same final state assert get_document_content(doc1) == get_document_content(doc2)@pytest.mark.asyncioasync def test_conflict_resolution_deterministic(): """Test that conflict resolution always produces same result""" resolver = AIConflictResolver(mock_llm, mock_embeddings) conflict_a = {"text": "Hello", "position": 0} conflict_b = {"text": "Hi", "position": 0} results = [] for _ in range(5): result, reason = await resolver.resolve_conflict( ConflictType.TEXT_INSERTION, conflict_a, conflict_b, "document about greetings" ) results.append(result) # All results should be identical assert all(r == results[0] for r in results)@pytest.mark.asyncio@pytest.mark.benchmarkasync def test_sync_performance_at_scale(benchmark): """Benchmark sync performance with many concurrent edits""" manager = CRDTSyncManager(MockStorage()) doc_id = "perf-test-doc" # Simulate 100 users making edits async def simulate_user_edits(user_id: int): updates = [ YjsUpdate( origin_client_id=f"client_{user_id}", clock=i, content=f"edit_{i}".encode() ) for i in range(10) ] await manager.sync_document( doc_id, f"client_{user_id}", updates, {} ) await asyncio.gather(*[ simulate_user_edits(i) for i in range(100) ]) # Verify performance assert manager.documents[doc_id].vector_clock
Closing Thoughts — What to Leave to Math, What to Support with Design
What a CRDT guarantees is convergence, not correctness. Every replica ending up in the same state and that state being what users actually want are two different problems.
That is why I believe the heart of the design is the separation of concerns. Leave ordering and network delays to the math. Support the "converges, but operations suffer" territory — document bloat — with compaction. Cover the "converges, but nobody is happy" territory — colliding intents — with an AI lane that lives outside the sync path. Since I started thinking in these three layers, I find myself far less stuck on collaborative-editing design questions.
As a next step, I recommend the smallest possible experiment: wire up Yjs with y-websocket, sync two browser tabs, take one offline, edit, and watch the merge on reconnect with your own eyes. Even tombstone bloat becomes tangible in that tiny setup if you keep an eye on the size of Y.encodeStateAsUpdate. The fastest way to graduate from the "sync that overwrites data" I described at the beginning is to internalize that feeling of convergence once, hands-on.
Thank you for reading all the way through. If you are wrestling with sync design the way I was, I hope this gives you a useful starting point.
Share
Thank You for Reading
Antigravity Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.