Managing Concurrent Delta Lake Writes from a Multi-User Web Application


The core challenge began with a simple, yet deceptive, requirement: allow our data science team to collaboratively clean and annotate a multi-terabyte dataset directly within our data lakehouse. The dataset, stored in Delta Lake format, is the source of truth for dozens of downstream analytics and machine learning pipelines. Moving slices of this data into a traditional OLTP database for editing and then syncing it back was a non-starter; it would introduce unacceptable latency, potential data drift, and significant architectural complexity. The mandate was clear: edit the data in-place.

This immediately pitted the typical interaction model of a web application—small, frequent, low-latency writes from multiple concurrent users—against the fundamental design of a data lake table format. Delta Lake provides phenomenal ACID guarantees for large-scale data manipulation, but it achieves this through an optimistic concurrency control mechanism. It assumes that concurrent writes to the same location are infrequent exceptions, not the norm. Our React-based editing interface, served by a Flask backend, was designed to create exactly this “exception” scenario as its primary mode of operation. The initial, naive implementation failed spectacularly under the slightest concurrent load.

The problem manifested as a cascade of pyarrow.lib.ArrowIOError: The AWS S3 HeadObject API returned 'Not Found' for key... An object may have been simultaneously deleted by a different writer. This cryptic message from the underlying S3 storage layer was a symptom of a race condition. In Delta Lake’s protocol, a transaction commits by writing a new JSON file to the _delta_log directory. If two writers attempt to create the 000...N.json file, only one will succeed. The second writer, upon seeing that its intended commit file was preempted, fails. The delta-rs library translates this into a ConcurrentWriteException. Our Flask application was simply letting these exceptions bubble up, resulting in a failed API call and a frustrated user.

Here is the initial, flawed Flask endpoint. It takes a payload representing a single row update, reads the Delta table, and executes a merge operation.

# /project/app/main/routes.py - The Naive Approach
import os
import logging
from flask import Blueprint, request, jsonify
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import ConcurrentWriteException
import pandas as pd

main = Blueprint('main', __name__)

# Basic configuration
S3_BUCKET = os.environ.get("S3_BUCKET")
TABLE_PATH = f"s3://{S3_BUCKET}/annotations_table"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@main.route('/api/update_record', methods=['POST'])
def update_record():
    """
    A naive endpoint to update a single record in the Delta table.
    This will fail under concurrent load.
    """
    try:
        data = request.get_json()
        if not data or 'record_id' not in data or 'updated_values' not in data:
            return jsonify({"error": "Invalid payload"}), 400

        record_id = data['record_id']
        updates = data['updated_values']

        # Create a pandas DataFrame for the merge source
        update_df = pd.DataFrame([{'record_id': record_id, **updates}])
        
        storage_options = {
            "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID"),
            "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY"),
            "AWS_REGION": os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
        }

        logging.info(f"Attempting to merge update for record_id: {record_id}")
        
        # The core operation that is prone to race conditions
        dt = DeltaTable(TABLE_PATH, storage_options=storage_options)
        dt.merge(
            source=update_df,
            predicate=f"s.record_id = t.record_id",
            source_alias="s",
            target_alias="t"
        ).when_matched_update_all().execute()

        logging.info(f"Successfully merged update for record_id: {record_id}")
        return jsonify({"status": "success", "record_id": record_id}), 200

    except ConcurrentWriteException as e:
        logging.error(f"Concurrency conflict for record_id {record_id}: {e}")
        return jsonify({"error": "A concurrent modification occurred. Please try again."}), 409 # 409 Conflict

    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}", exc_info=True)
        return jsonify({"error": "An internal server error occurred."}), 500

The solution was not to avoid these exceptions—they are a feature, not a bug—but to build an application-level layer to handle them gracefully. This meant implementing a retry mechanism with exponential backoff and jitter. The goal is to catch the ConcurrentWriteException, wait for a short, randomized period, and then re-attempt the entire read-merge-write transaction. The Delta table object must be re-instantiated within the loop to ensure it reads the latest version of the transaction log before attempting the merge.

Here is the architecture of our retry logic, implemented as a Python decorator for clean, reusable application across different endpoints.

# /project/app/utils/concurrency.py - The Retry Decorator
import time
import random
import logging
from functools import wraps
from deltalake.exceptions import ConcurrentWriteException

# Configuration for retry logic
RETRY_CONFIG = {
    'MAX_RETRIES': 5,
    'INITIAL_BACKOFF_MS': 100,
    'MAX_BACKOFF_MS': 2000,
    'JITTER_FACTOR': 0.2
}

def delta_lake_retry(config=RETRY_CONFIG):
    """
    A decorator to handle Delta Lake's ConcurrentWriteException with
    exponential backoff and jitter.

    This wraps a function that performs a Delta Lake write operation. If a
    ConcurrentWriteException is caught, it will retry the function up to
    MAX_RETRIES times.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            backoff_ms = config['INITIAL_BACKOFF_MS']
            
            while retries < config['MAX_RETRIES']:
                try:
                    # Attempt the original function
                    return func(*args, **kwargs)
                except ConcurrentWriteException as e:
                    retries += 1
                    if retries >= config['MAX_RETRIES']:
                        logging.error(f"Final attempt failed after {retries} retries. Aborting. Error: {e}")
                        raise e # Re-raise the exception to be caught by the route handler

                    # Calculate jitter: a random value to prevent thundering herd
                    jitter = random.uniform(
                        -config['JITTER_FACTOR'] * backoff_ms,
                        config['JITTER_FACTOR'] * backoff_ms
                    )
                    
                    wait_time_ms = backoff_ms + jitter
                    wait_time_sec = wait_time_ms / 1000.0
                    
                    logging.warning(
                        f"ConcurrentWriteException caught on attempt {retries}. "
                        f"Retrying in {wait_time_sec:.2f} seconds. Error: {e}"
                    )
                    
                    time.sleep(wait_time_sec)
                    
                    # Exponentially increase backoff for next potential retry
                    backoff_ms = min(config['MAX_BACKOFF_MS'], backoff_ms * 2)

        return wrapper
    return decorator

With this decorator, the Flask route can be refactored. The core logic is now encapsulated in a separate function, which allows the decorator to wrap the entire transactional block.

# /project/app/main/routes.py - Refactored with Retry Logic
# ... (imports from above, plus the new decorator)
from ..utils.concurrency import delta_lake_retry

# ... (Blueprint, config, logging setup)

def _perform_merge_operation(table_path: str, storage_options: dict, update_df: pd.DataFrame):
    """
    The core logic of the merge operation, designed to be retried.
    A new DeltaTable object is instantiated on each call to ensure it's
    up-to-date with the transaction log.
    """
    logging.info(f"Performing merge for record_ids: {update_df['record_id'].tolist()}")
    dt = DeltaTable(table_path, storage_options=storage_options)
    
    # In a real-world project, the predicate should be as specific as possible
    # to reduce the surface area for conflicts. If you have a composite key,
    # use it here.
    dt.merge(
        source=update_df,
        predicate="s.record_id = t.record_id",
        source_alias="s",
        target_alias="t"
    ).when_matched_update_all().execute()
    logging.info("Merge operation completed successfully for this attempt.")


@main.route('/api/update_record_robust', methods=['POST'])
def update_record_robust():
    """
    A robust endpoint that uses a retry mechanism to handle concurrency.
    """
    try:
        data = request.get_json()
        if not data or 'record_id' not in data or 'updated_values' not in data:
            return jsonify({"error": "Invalid payload"}), 400

        record_id = data['record_id']
        updates = data['updated_values']
        update_df = pd.DataFrame([{'record_id': record_id, **updates}])
        
        storage_options = {
            "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID"),
            "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY"),
            "AWS_REGION": os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
        }
        
        # The decorated function call
        @delta_lake_retry()
        def decorated_merge():
            _perform_merge_operation(TABLE_PATH, storage_options, update_df)

        decorated_merge()

        return jsonify({"status": "success", "record_id": record_id}), 200

    except ConcurrentWriteException:
        # This is now the final failure case after all retries
        logging.error(f"Failed to merge record_id {record_id} after all retries.")
        return jsonify({
            "error": "High contention on this record. Please refresh the data and try again."
        }), 409

    except Exception as e:
        logging.error(f"An unexpected error occurred during robust update: {e}", exc_info=True)
        return jsonify({"error": "An internal server error occurred."}), 500

The interaction flow with this retry mechanism can be visualized.

sequenceDiagram
    participant ReactUI as React UI
    participant FlaskAPI as Flask API
    participant DeltaLake as Delta Lake (S3)

    ReactUI->>+FlaskAPI: POST /api/update_record_robust (data)
    FlaskAPI->>FlaskAPI: Start Retry Loop (Attempt 1)
    FlaskAPI->>+DeltaLake: Read _delta_log (gets version N)
    FlaskAPI->>+DeltaLake: Prepare MERGE commit for version N+1
    
    Note right of DeltaLake: At the same time, another request commits version N+1
    
    FlaskAPI->>DeltaLake: Attempt to write commit N+1
    DeltaLake-->>-FlaskAPI: Fail (ConcurrentWriteException)
    
    FlaskAPI->>FlaskAPI: Catch Exception, Log Warning
    FlaskAPI->>FlaskAPI: Wait (Exponential Backoff + Jitter)
    
    FlaskAPI->>FlaskAPI: Start Retry Loop (Attempt 2)
    FlaskAPI->>+DeltaLake: Read _delta_log (gets version N+1)
    FlaskAPI->>+DeltaLake: Prepare MERGE commit for version N+2
    FlaskAPI->>DeltaLake: Attempt to write commit N+2
    DeltaLake-->>-FlaskAPI: Success
    
    FlaskAPI-->>-ReactUI: 200 OK {"status": "success"}

On the frontend, the React component needs to handle the asynchronous nature of this call and provide clear feedback to the user. The API call might take longer than usual if retries are happening. The UI must reflect a “saving” state and properly handle both the final success and the potential 409 Conflict error.

// /client/src/components/DataEditorRow.js

import React, { useState, useCallback } from 'react';

// A simplified representation of a row in our data editor grid
export default function DataEditorRow({ initialRecord, onSaveSuccess }) {
    const [record, setRecord] = useState(initialRecord);
    const [isSaving, setIsSaving] = useState(false);
    const [error, setError] = useState(null);

    const handleInputChange = (e) => {
        const { name, value } = e.target;
        setRecord(prev => ({ ...prev, [name]: value }));
        setError(null); // Clear previous errors on new edit
    };

    const handleSave = useCallback(async () => {
        setIsSaving(true);
        setError(null);

        try {
            const response = await fetch('/api/update_record_robust', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    record_id: record.record_id,
                    updated_values: {
                        label: record.label,
                        comment: record.comment
                    }
                }),
            });

            if (!response.ok) {
                // The server will respond with 409 if all retries fail
                if (response.status === 409) {
                    const errorData = await response.json();
                    throw new Error(errorData.error || 'Conflict detected. Please refresh.');
                }
                throw new Error(`Server responded with status: ${response.status}`);
            }

            const result = await response.json();
            if (onSaveSuccess) {
                onSaveSuccess(result); // Notify parent component
            }

        } catch (err) {
            console.error("Failed to save record:", err);
            setError(err.message);
        } finally {
            setIsSaving(false);
        }
    }, [record, onSaveSuccess]);

    return (
        <tr className={isSaving ? 'saving' : (error ? 'error' : '')}>
            <td>{record.record_id}</td>
            <td>
                <input
                    type="text"
                    name="label"
                    value={record.label}
                    onChange={handleInputChange}
                    disabled={isSaving}
                />
            </td>
            <td>
                <textarea
                    name="comment"
                    value={record.comment}
                    onChange={handleInputChange}
                    disabled={isSaving}
                />
            </td>
            <td>
                <button onClick={handleSave} disabled={isSaving}>
                    {isSaving ? 'Saving...' : 'Save'}
                </button>
                {error && <div className="error-message">{error}</div>}
            </td>
        </tr>
    );
}

Testing this concurrent behavior is non-trivial. Standard unit tests are insufficient. An integration test that simulates a race condition is required to have any confidence in the retry mechanism. Here’s a conceptual test using Python’s multiprocessing library with pytest. This test sets up a temporary Delta table, then spawns multiple processes to hammer the same record via the Flask test client.

# /project/tests/test_concurrency.py
import pytest
import pandas as pd
from multiprocessing import Process, Queue
from deltalake import write_deltalake, DeltaTable

def update_process(app_client, record_id, new_label, result_queue):
    """Target function for each concurrent process."""
    try:
        payload = {
            'record_id': record_id,
            'updated_values': {'label': new_label}
        }
        response = app_client.post('/api/update_record_robust', json=payload)
        result_queue.put({'status': response.status_code, 'json': response.get_json()})
    except Exception as e:
        result_queue.put({'status': 500, 'error': str(e)})

@pytest.fixture(scope="module")
def temp_delta_table(tmp_path_factory):
    """Creates a temporary Delta table for the test module."""
    path = tmp_path_factory.mktemp("delta_table")
    df = pd.DataFrame({
        "record_id": ["abc-123", "def-456"],
        "label": ["initial_A", "initial_B"],
        "partition_key": ["part1", "part1"]
    })
    write_deltalake(path, df, partition_by=["partition_key"])
    return str(path)

def test_concurrent_updates_on_same_record(client, temp_delta_table, monkeypatch):
    """
    Simulates multiple users trying to update the exact same record simultaneously.
    We expect one to succeed and the others to fail with a 409 after retries,
    or for them to succeed sequentially if retries work as intended. The final
    state of the table must be consistent.
    """
    # Patch the TABLE_PATH to point to our temporary test table
    monkeypatch.setattr('app.main.routes.TABLE_PATH', temp_delta_table)

    num_processes = 4
    processes = []
    result_queue = Queue()
    
    # All processes will try to update the same record_id to a different value
    updates = [f"update_{i}" for i in range(num_processes)]

    for i in range(num_processes):
        p = Process(target=update_process, args=(client, "abc-123", updates[i], result_queue))
        processes.append(p)
        p.start()

    for p in processes:
        p.join(timeout=30) # Wait for processes to finish, with a timeout

    results = []
    while not result_queue.empty():
        results.append(result_queue.get())
    
    assert len(results) == num_processes

    success_count = sum(1 for r in results if r['status'] == 200)
    conflict_count = sum(1 for r in results if r['status'] == 409)

    # In a heavily contended scenario, we expect at least one success.
    # The others may succeed after retries or eventually fail.
    # The key is that the system remains stable and doesn't corrupt data.
    assert success_count > 0
    assert success_count + conflict_count == num_processes

    # Final check: The data in the table should reflect one of the successful updates.
    dt = DeltaTable(temp_delta_table)
    final_df = dt.to_pandas()
    final_record = final_df[final_df['record_id'] == 'abc-123'].iloc[0]
    
    # The final label must be one of the attempted updates
    assert final_record['label'] in updates

This architecture, while effective, is not a silver bullet. The retry mechanism handles low-to-moderate write contention gracefully but has its limits. If dozens of users are attempting to modify the same data partition every second, the likelihood of a request failing after all retries increases significantly, and API latency will suffer. The current configuration is tuned for a scenario with a few dozen concurrent editors, not a high-throughput transactional system.

Further optimizations could involve dynamically adjusting the merge predicate. If a user is only editing a single column, the when_matched_update clause could be tailored to update(SET {column_name} = s.{column_name}), reducing the chance of conflict with another user editing a different column on the same row. For extremely high-contention scenarios, a more sophisticated approach might be required, such as an in-memory locking service (e.g., using Redis) to serialize access to specific records before the Delta Lake transaction is even attempted. This, however, introduces its own state management complexity, and for our use case, the robust application-level retry provided the right balance of implementation simplicity and production stability.


  TOC