ANTIGRAVITY LABJP
Articles/App Development
App Development/2026-03-14Advanced

Real-Time Collaborative Editing with CRDTs and AI — Sync Design That Resolves Conflicts with Math

Starting from a sync bug that silently erased user data, this deep dive covers CRDT-based conflict-free sync design: choosing between Yjs and Automerge, compaction against tombstone bloat, and keeping AI conflict resolution out of the sync path.

CRDTsCollaborative EditingReal-time SyncConflict ResolutionYjsOffline-first

Premium Article

I once shipped multi-device sync for a favorites list using the simplest possible rule: the last device to save wins. This was in one of the wallpaper apps I build and run as an indie developer.

On paper, the design looked harmless. In practice, a user would curate their favorites offline on the train, reconnect to Wi-Fi at home, and watch the app silently overwrite everything they had added on another device. By the time the reports surfaced in reviews, the data was gone for good. Unlike a crash, a sync bug like this barely leaves a trace in the logs. I still remember the cold sweat.

That painful lesson is what pushed me to study CRDTs (Conflict-free Replicated Data Types) seriously. The "edit together without breaking anything" experience that Google Docs, Figma, and Notion make look effortless is not magic — it is mathematics. By making operations commutative and idempotent, every replica converges to the same state without a central arbiter. Once that property clicks, you never look at sync design the same way again.

In the sections that follow, we will work through how CRDTs make collaborative editing possible, a production-grade implementation built around Yjs, how to choose between Yjs and Automerge, the document-bloat problem you will inevitably hit in operation, and the design decisions involved in adding AI to the merge process — all with working code.

Understanding CRDTs: The Foundation

The Core Problem CRDTs Solve

In traditional client-server systems, a single server coordinates all changes. This works until you need offline functionality, P2P collaboration, or low-latency experiences. The moment you remove the server's coordinating power, you face the fundamental challenge: how do you merge concurrent edits?

User A: "Hello"  → edits at position 0, inserts "Hi"  → "HiHello"
User B: "Hello"  → edits at position 0, inserts "Hey" → "HeyHello"

Question: When both changes sync, what's the final state?
Traditional approach: Server decides (requires round-trip)
CRDT approach: Mathematical rules ensure convergence

CRDTs solve this by making operations commutative and idempotent. The order operations arrive doesn't matter—the final state is always identical, regardless of network delays or concurrent edits.

CRDT Types and Their Trade-offs

┌─────────────────────────────────────────────────────────┐
│ OPERATION-BASED (Commutative Replicated Data Types)    │
├─────────────────────────────────────────────────────────┤
│ Broadcast operations → operations automatically          │
│ commute/compose                                         │
│ Best for: Counter, Set, Flag operations                │
│ Network overhead: Low                                   │
│ Complexity: Medium                                      │
│ Example: Increment counter by 5 (order doesn't matter) │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ STATE-BASED (Commutative Replicated Data Types)        │
├─────────────────────────────────────────────────────────┤
│ Broadcast state → merge happens at destination          │
│ Best for: Complex documents, trees                      │
│ Network overhead: High                                  │
│ Complexity: Lower for simple merges                     │
│ Example: Last-write-wins (but loses data)              │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ SEQUENCE CRDTs (WOOT, RGA, Yjs)                        │
├─────────────────────────────────────────────────────────┤
│ Specialized for text/ordered sequences                 │
│ Assign unique IDs to each character                    │
│ Best for: Text editors, lists                          │
│ Network overhead: Very high initially                  │
│ Complexity: High                                       │
│ Example: Every character gets (siteID, clock) ID      │
└─────────────────────────────────────────────────────────┘
💡
**Choosing a CRDT:** For text editing, use sequence CRDTs (Yjs, Automerge). For application state (user presence, cursors, selections), use operation-based CRDTs. Hybrid approaches combine both.

Implementing Yjs: The Production CRDT

Yjs is the de facto standard for collaborative text editing. It's used by Notion, Figma, and dozens of others. Understanding Yjs teaches the patterns applicable to any CRDT.

Core Architecture

# crdt/yjs_bridge.py
"""
Bridge between Python backend and Yjs (JavaScript CRDT).
In production, Yjs runs on client; we sync to server storage.
"""
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
 
@dataclass
class YjsUpdate:
    """Represents a change in a Yjs document"""
    origin_client_id: str
    clock: int  # Lamport clock for ordering
    content: bytes  # Encoded Yjs update
    timestamp: datetime = field(default_factory=datetime.utcnow)
    parent_clock: int = 0  # For causal ordering
 
    def to_dict(self) -> Dict[str, Any]:
        return {
            "origin_client_id": self.origin_client_id,
            "clock": self.clock,
            "content": self.content.hex(),  # Base64 for JSON
            "timestamp": self.timestamp.isoformat(),
            "parent_clock": self.parent_clock
        }
 
@dataclass
class CollaborativeDocument:
    """A document managed by CRDT synchronization"""
    doc_id: str
    client_id: str
    updates: List[YjsUpdate] = field(default_factory=list)
    vector_clock: Dict[str, int] = field(default_factory=dict)
    last_synced: Optional[datetime] = None
 
    def add_update(self, update: YjsUpdate) -> None:
        """Add a local change to the document"""
        self.updates.append(update)
        # Update vector clock for this client
        self.vector_clock[self.client_id] = (
            self.vector_clock.get(self.client_id, 0) + 1
        )
 
    def get_missing_updates(
        self,
        remote_vector_clock: Dict[str, int]
    ) -> List[YjsUpdate]:
        """Determine which updates haven't been seen by remote client"""
        missing = []
 
        for update in self.updates:
            remote_clock = remote_vector_clock.get(
                update.origin_client_id, 0
            )
            if update.clock > remote_clock:
                missing.append(update)
 
        return missing
 
    def merge_remote_update(self, update: YjsUpdate) -> None:
        """Incorporate a change from another client"""
        # Insert update in clock order
        insert_pos = len(self.updates)
        for i, existing in enumerate(self.updates):
            if existing.clock > update.clock:
                insert_pos = i
                break
 
        self.updates.insert(insert_pos, update)
 
        # Update our vector clock
        self.vector_clock[update.origin_client_id] = max(
            self.vector_clock.get(update.origin_client_id, 0),
            update.clock
        )
 
# Document synchronization protocol
class CRDTSyncManager:
    """Manages CRDT document synchronization across clients"""
 
    def __init__(self, storage_backend, max_batch_size: int = 100):
        self.storage = storage_backend
        self.max_batch_size = max_batch_size
        self.documents: Dict[str, CollaborativeDocument] = {}
 
    async def sync_document(
        self,
        doc_id: str,
        client_id: str,
        local_updates: List[YjsUpdate],
        remote_vector_clock: Dict[str, int]
    ) -> Dict[str, Any]:
        """
        Sync a document between client and server.
 
        The server acts as a "causal sequencer" but NOT a coordinator.
        All merge decisions are deterministic based on vector clocks.
        """
        doc = self.documents.get(doc_id)
        if not doc:
            doc = CollaborativeDocument(doc_id=doc_id, client_id=client_id)
            self.documents[doc_id] = doc
 
        # Merge incoming updates
        for update in local_updates:
            doc.merge_remote_update(update)
 
        # Find updates to send back
        missing_updates = doc.get_missing_updates(remote_vector_clock)
 
        # Batch for efficiency
        update_batches = [
            missing_updates[i:i + self.max_batch_size]
            for i in range(0, len(missing_updates), self.max_batch_size)
        ]
 
        # Persist to storage
        await self.storage.save_document(doc)
 
        return {
            "doc_id": doc_id,
            "acknowledged": len(local_updates),
            "updates_to_apply": update_batches,
            "server_vector_clock": doc.vector_clock,
            "needs_full_sync": self._should_trigger_full_sync(doc)
        }
 
    def _should_trigger_full_sync(self, doc: CollaborativeDocument) -> bool:
        """Detect situations requiring full document state"""
        # If document has grown very large, send state instead of ops
        total_update_size = sum(
            len(u.content) for u in doc.updates
        )
        return total_update_size > 10_000_000  # 10MB threshold
⚠️
**Storage Implications:** Storing raw CRDT updates forever bloats storage. Implement "snapshots"—periodically save the document's complete state, discard old updates. Example: snapshot every 1000 updates or daily.

Real-time Synchronization Protocol

# sync/websocket_handler.py
import asyncio
from typing import Set
from fastapi import WebSocket, WebSocketDisconnect
import json
 
class CollaborativeDocumentManager:
    """WebSocket handler for real-time document synchronization"""
 
    def __init__(self, crdt_sync: CRDTSyncManager):
        self.crdt_sync = crdt_sync
        # Room -> Set of connected clients
        self.active_connections: Dict[str, Set[WebSocket]] = {}
        # Track which client is connected to which room
        self.client_rooms: Dict[WebSocket, str] = {}
 
    async def connect(self, websocket: WebSocket, doc_id: str, client_id: str):
        """Client joins document editing session"""
        await websocket.accept()
 
        if doc_id not in self.active_connections:
            self.active_connections[doc_id] = set()
 
        self.active_connections[doc_id].add(websocket)
        self.client_rooms[websocket] = doc_id
 
        # Send initial document state
        doc = self.crdt_sync.documents.get(doc_id)
        await websocket.send_json({
            "type": "initial_state",
            "updates": [u.to_dict() for u in (doc.updates if doc else [])],
            "vector_clock": doc.vector_clock if doc else {}
        })
 
    async def handle_client_updates(
        self,
        websocket: WebSocket,
        doc_id: str,
        client_id: str,
        updates: List[Dict[str, Any]]
    ):
        """Process updates from a client and broadcast to others"""
        # Convert JSON back to YjsUpdate objects
        yjs_updates = [
            YjsUpdate(
                origin_client_id=u["origin_client_id"],
                clock=u["clock"],
                content=bytes.fromhex(u["content"]),
                parent_clock=u.get("parent_clock", 0)
            )
            for u in updates
        ]
 
        # Sync with server
        sync_result = await self.crdt_sync.sync_document(
            doc_id=doc_id,
            client_id=client_id,
            local_updates=yjs_updates,
            remote_vector_clock={}
        )
 
        # Broadcast to other clients
        await self.broadcast_updates(
            doc_id,
            exclude_client=websocket,
            updates=yjs_updates,
            server_vector_clock=sync_result["server_vector_clock"]
        )
 
        # Acknowledge receipt
        await websocket.send_json({
            "type": "ack",
            "acknowledged_count": sync_result["acknowledged"],
            "server_vector_clock": sync_result["server_vector_clock"]
        })
 
    async def broadcast_updates(
        self,
        doc_id: str,
        exclude_client: WebSocket,
        updates: List[YjsUpdate],
        server_vector_clock: Dict[str, int]
    ):
        """Send updates to all connected clients"""
        if doc_id not in self.active_connections:
            return
 
        disconnected = set()
 
        for client in self.active_connections[doc_id]:
            if client == exclude_client:
                continue
 
            try:
                await client.send_json({
                    "type": "remote_updates",
                    "updates": [u.to_dict() for u in updates],
                    "server_vector_clock": server_vector_clock
                })
            except Exception:
                disconnected.add(client)
 
        # Clean up disconnected clients
        for client in disconnected:
            await self.disconnect(client)
 
    async def disconnect(self, websocket: WebSocket):
        """Client disconnects from session"""
        doc_id = self.client_rooms.pop(websocket, None)
        if doc_id and doc_id in self.active_connections:
            self.active_connections[doc_id].discard(websocket)
            if not self.active_connections[doc_id]:
                del self.active_connections[doc_id]

Thank you for reading this far.

Continue Reading

What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.

WHAT YOU'LL LEARN
A field-tested framework for choosing between Yjs and Automerge — payload size, document growth, and ecosystem
A snapshot-compaction implementation that keeps tombstone-driven document bloat in check (with SQLite persistence code)
Three design decisions for keeping AI conflict resolution out of the sync path — latency, cost, and fallback
Secure payment via Stripe · Cancel anytime

Unlock This Article

Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.

or
Unlock all articles with Membership →
Share

Thank You for Reading

Antigravity Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.

  • Copy-paste ready implementation code
  • New advanced guides published daily
  • $5/mo or $10 for lifetime access
View Membership →

Related Articles

App Dev2026-06-28
Adding Mediation Partners Quietly Starved My iOS Attribution — Reconciling SKAdNetwork IDs Across Four Apps
I added mediation partners but iOS revenue barely moved — the cause was missing SKAdNetwork IDs in Info.plist. Here is how I reconciled SKAdNetworkItems across four apps, using an Antigravity agent as the matcher while keeping the revenue decisions by hand.
App Dev2026-06-28
When Streaming Works Locally but Arrives All at Once in Production — Field Notes on Proxy Buffering and Silent Stalls
Stream Gemini through Antigravity over SSE and it flows token-by-token on localhost, then freezes for seconds and dumps the whole answer in production. Field notes on measuring the stall first, then killing proxy buffering, idle disconnects, and reconnect-driven double generation.
App Dev2026-06-27
Before AI Studio's Gradle and AGP Versions Quietly Break Your Existing App
When you drop an AI Studio-generated Kotlin/Compose screen into an existing Android app, the AGP, Kotlin, and library versions drift and the build breaks silently. Here is how to pin a single source of truth with a version catalog and add a gate that inspects the generated declarations at the import boundary, with measurements and code.
📚RECOMMENDED BOOKS
Build a Large Language Model (From Scratch)
Sebastian Raschka
LLM Dev
Prompt Engineering for LLMs
Berryman & Ziegler
Prompting
AI Engineering
Chip Huyen
AI Eng
* Contains affiliate links
See all →