Implementing a Queryable Parquet-Based Data Lake on DigitalOcean for Cross-Stack Telemetry Aggregation


The cost of third-party observability platforms became unsustainable. With a growing number of Python microservices and a complex single-page application built with Webpack, all running on DigitalOcean Droplets, the volume of telemetry data—frontend errors, API performance metrics, and backend application logs—was exploding. We were flying blind, reverting to ssh and grep during incidents, a process that was neither scalable nor efficient. The immediate technical pain point was the lack of a centralized, queryable system for our operational data that didn’t require a six-figure annual budget.

Our initial concept was a barebones log aggregation pipeline. The plan was to funnel all telemetry as JSON blobs to a single endpoint and dump them into DigitalOcean Spaces. This seemed simple enough, but the real challenge emerged when we considered the query requirements. How could a developer efficiently find all frontend errors for a specific user ID from the last 72 hours without downloading and parsing terabytes of gzipped JSON? This question forced a significant evolution of the architecture. A simple log dump wasn’t enough; we needed a pragmatic, low-cost data lake.

The technology selection was driven by our existing ecosystem and a mandate for cost-effectiveness.

  1. Infrastructure: DigitalOcean was the given. We leveraged Droplets for services, App Platform for the query API, Functions for serverless ingestion, and Spaces for object storage. Spaces is S3-compatible, which was a critical factor for library support.
  2. Data Processing & APIs: Python was the natural choice. Our backend team was proficient in it, and the data ecosystem with libraries like pandas, pyarrow, and fastapi is unparalleled. It provided the tools to handle everything from data ingestion and transformation to serving query results.
  3. Data Lake Format: Storing raw JSON was discarded due to abysmal query performance. We opted for Apache Parquet. Its columnar storage format means a query for user_id and error_message only reads data for those columns, drastically reducing I/O compared to row-based formats like JSON or CSV. Furthermore, Parquet’s support for partitioning would allow us to physically separate data on disk by date and event type, making time-series and categorical queries extremely fast.
  4. Frontend Interface: Our existing SPA was built and bundled with Webpack. Creating a new, lightweight dashboard page within the same project was the path of least resistance. We could reuse existing components and authentication logic.

The architecture crystallized into four distinct stages: serverless ingestion, batch ETL, a query API, and a frontend client.

graph TD
    subgraph Clients
        A[Frontend App] --> B
        C[Python Service 1] --> B
        D[Python Service 2] --> B
    end

    subgraph Ingestion [Stage 1: Serverless Ingestion]
        B(DigitalOcean Function) -- JSON Payload --> E{DO Spaces: Raw Bucket}
    end

    subgraph ETL [Stage 2: Batch ETL]
        F(Scheduled Python Job on Droplet) -- Reads Raw JSON --> E
        F -- Writes Partitioned Parquet --> G{DO Spaces: Processed Bucket}
    end

    subgraph Querying [Stage 3 & 4: Query & Visualization]
        G -- Data Source --> H(Python Query API)
        H -- REST API --> I(Webpack SPA Dashboard)
        I -- User Queries --> H
    end

    style Ingestion fill:#f9f,stroke:#333,stroke-width:2px
    style ETL fill:#ccf,stroke:#333,stroke-width:2px
    style Querying fill:#cfc,stroke:#333,stroke-width:2px

Stage 1: The Firehose Ingestion Endpoint

The first component is a serverless function on DigitalOcean that acts as a simple, highly available ingestion endpoint. Its only job is to accept POST requests with JSON payloads, add a server-side timestamp, and write the data to a “raw” bucket in DigitalOcean Spaces. A key design decision here is to avoid any processing or validation at this stage. The goal is to capture data as quickly and reliably as possible.

A real-world project requires robust configuration management, not hardcoded strings. We manage this through environment variables, which DigitalOcean Functions supports natively.

project.yml:

packages:
  - name: telemetry-ingest
    actions:
      - name: ingest
        runtime: python:3.9
        main: main
        environment:
          SPACES_ENDPOINT_URL: ${SPACES_ENDPOINT_URL}
          SPACES_ACCESS_KEY: ${SPACES_ACCESS_KEY}
          SPACES_SECRET_KEY: ${SPACES_SECRET_KEY}
          RAW_DATA_BUCKET: "company-telemetry-raw"

The Python function itself uses boto3, the AWS SDK for Python, which works seamlessly with S3-compatible services like Spaces.

packages/telemetry-ingest/ingest/main.py:

import os
import json
import uuid
import boto3
from botocore.client import Config
from datetime import datetime
import logging

# Basic logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# A common mistake is to create a new client for every invocation.
# In a serverless environment, global-level initialization allows for connection reuse
# across invocations within the same container instance.
session = boto3.session.Session()
client = session.client('s3',
                        region_name='nyc3',
                        endpoint_url=os.environ.get('SPACES_ENDPOINT_URL'),
                        aws_access_key_id=os.environ.get('SPACES_ACCESS_KEY'),
                        aws_secret_access_key=os.environ.get('SPACES_SECRET_KEY'),
                        config=Config(s3={'addressing_style': 'virtual'}))

RAW_DATA_BUCKET = os.environ.get('RAW_DATA_BUCKET')

def main(args):
    """
    DigitalOcean Function entry point.
    Accepts a JSON payload and writes it to a timestamped path in Spaces.
    """
    try:
        # The body is passed as a string in the args dictionary
        payload = args.get('__ow_body', '{}')
        if not isinstance(payload, str):
            payload = json.dumps(payload)
        
        data = json.loads(payload)

        # Enrich with server-side metadata
        server_timestamp = datetime.utcnow()
        data['_server_timestamp_utc'] = server_timestamp.isoformat()
        data['_ingest_id'] = str(uuid.uuid4())

        # The object key structure is critical for organization and avoiding "hot spots".
        # YYYY/MM/DD/HH partitioning is a standard practice for time-series data.
        object_key = (
            f"{server_timestamp.strftime('%Y/%m/%d/%H')}/"
            f"{server_timestamp.isoformat()}_{uuid.uuid4()}.json"
        )

        client.put_object(
            Bucket=RAW_DATA_BUCKET,
            Key=object_key,
            Body=json.dumps(data).encode('utf-8'),
            ContentType='application/json'
        )
        
        return {
            "statusCode": 202,
            "headers": { 'Content-Type': 'application/json' },
            "body": { 'status': 'accepted', 'key': object_key }
        }

    except json.JSONDecodeError:
        logger.error("Failed to decode JSON payload")
        return { "statusCode": 400, "body": { "error": "Invalid JSON payload" } }
    except Exception as e:
        # In a real-world project, you'd have more specific exception handling.
        # This catch-all prevents the function from crashing and provides some insight.
        logger.exception(f"An unexpected error occurred: {e}")
        return { "statusCode": 500, "body": { "error": "Internal server error during ingestion" } }

This approach is simple and horizontally scalable. The main pitfall to avoid is performing synchronous, heavy processing here. Any delay increases the chance of timeouts and data loss. The sole responsibility is durable persistence.

Stage 2: The Parquet ETL Converter

This is the core of the data lake transformation. A scheduled Python script runs periodically (e.g., hourly via a cron job on a utility Droplet) to convert the raw, schemaless JSON files into structured, partitioned Parquet files.

This job faces a few challenges:

  1. State Management: It needs to know which JSON files it has already processed to avoid duplication.
  2. Schema Enforcement: Telemetry data can be messy. The job must handle missing keys, incorrect data types, and evolve as new fields are added.
  3. Efficiency: It must process potentially thousands of small JSON files without consuming excessive memory or time.

Here’s a simplified but production-grade implementation of the ETL job.

etl_runner/config.py:

import os
from dotenv import load_dotenv

load_dotenv()

class Settings:
    SPACES_ENDPOINT_URL = os.environ.get('SPACES_ENDPOINT_URL')
    SPACES_ACCESS_KEY = os.environ.get('SPACES_ACCESS_KEY')
    SPACES_SECRET_KEY = os.environ.get('SPACES_SECRET_KEY')
    
    RAW_BUCKET = "company-telemetry-raw"
    PROCESSED_BUCKET = "company-telemetry-processed"
    
    # In a production system, this would be a persistent store like Redis or a database file.
    # For simplicity, we use a local file to track processed files.
    STATE_FILE_PATH = "./processed_files.log"

settings = Settings()

etl_runner/process.py:

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from botocore.client import Config
from datetime import datetime, timedelta
import logging
import io

from config import settings

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ETLProcessor:
    def __init__(self):
        self.s3_client = self._get_s3_client()
        self.processed_files = self._load_state()

    def _get_s3_client(self):
        session = boto3.session.Session()
        return session.client('s3',
                              region_name='nyc3',
                              endpoint_url=settings.SPACES_ENDPOINT_URL,
                              aws_access_key_id=settings.SPACES_ACCESS_KEY,
                              aws_secret_access_key=settings.SPACES_SECRET_KEY,
                              config=Config(s3={'addressing_style': 'virtual'}))

    def _load_state(self):
        try:
            with open(settings.STATE_FILE_PATH, 'r') as f:
                return set(line.strip() for line in f)
        except FileNotFoundError:
            return set()

    def _save_state(self):
        with open(settings.STATE_FILE_PATH, 'w') as f:
            for file_key in sorted(list(self.processed_files)):
                f.write(f"{file_key}\n")

    def list_new_files(self, prefix):
        paginator = self.s3_client.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=settings.RAW_BUCKET, Prefix=prefix)
        for page in pages:
            if 'Contents' in page:
                for obj in page['Contents']:
                    if obj['Key'] not in self.processed_files:
                        yield obj['Key']

    def process_batch(self, file_keys):
        records = []
        processed_in_batch = []
        for key in file_keys:
            try:
                response = self.s3_client.get_object(Bucket=settings.RAW_BUCKET, Key=key)
                content = response['Body'].read().decode('utf-8')
                data = json.loads(content)
                records.append(data)
                processed_in_batch.append(key)
            except Exception as e:
                logging.error(f"Failed to process file {key}: {e}")
                # In production, move failed files to a 'dead-letter' queue/prefix for inspection.
        
        if not records:
            return []

        df = pd.DataFrame(records)
        
        # --- Schema Enforcement and Typing ---
        # This section is critical and often becomes complex.
        # Here we ensure required columns exist and cast types.
        df['event_timestamp'] = pd.to_datetime(df.get('timestamp_utc', pd.NaT))
        df['event_type'] = df.get('event_type', 'unknown').astype('category')
        df['user_id'] = df.get('user_id', -1).astype(int)
        
        # Extract date for partitioning
        df['date'] = df['event_timestamp'].dt.date
        
        return df, processed_in_batch

    def write_to_parquet(self, df):
        if df.empty:
            return
        
        # Convert to Arrow Table for efficient writing
        table = pa.Table.from_pandas(df, preserve_index=False)
        
        # This is where the magic happens. We write the data to the processed bucket,
        # partitioned by 'event_type' and 'date'. This creates a directory structure
        # like: s3://processed-bucket/event_type=api_error/date=2023-10-27/file.parquet
        pq.write_to_dataset(
            table,
            root_path=f"s3://{settings.PROCESSED_BUCKET}",
            partition_cols=['event_type', 'date'],
            filesystem=pa.fs.S3FileSystem(
                endpoint_override=settings.SPACES_ENDPOINT_URL,
                access_key=settings.SPACES_ACCESS_KEY,
                secret_key=settings.SPACES_SECRET_KEY,
                region='nyc3'
            )
        )
        logging.info(f"Wrote {len(df)} records to partitioned Parquet dataset.")

    def run(self):
        # Process data from the last 2 hours to handle any ingestion delays.
        now = datetime.utcnow()
        prefixes_to_scan = [
            (now - timedelta(hours=i)).strftime('%Y/%m/%d/%H') for i in range(2)
        ]
        
        all_new_files = []
        for prefix in prefixes_to_scan:
            logging.info(f"Scanning for new files in prefix: {prefix}")
            all_new_files.extend(list(self.list_new_files(prefix)))
        
        if not all_new_files:
            logging.info("No new files to process.")
            return

        logging.info(f"Found {len(all_new_files)} new files to process.")
        
        # Process in chunks to manage memory
        chunk_size = 1000
        for i in range(0, len(all_new_files), chunk_size):
            batch_keys = all_new_files[i:i + chunk_size]
            df, processed_keys = self.process_batch(batch_keys)
            
            if not df.empty:
                self.write_to_parquet(df)
                
            self.processed_files.update(processed_keys)
        
        self._save_state()
        logging.info("ETL run completed.")

if __name__ == "__main__":
    processor = ETLProcessor()
    processor.run()

A common mistake is to read all JSON files into memory at once. This works for small volumes but quickly fails. Processing in batches and using generators is key to memory efficiency. The state management here is file-based for simplicity, but a more robust solution might use a small SQLite database or a key-value store.

Stage 3: The Query API

With the data neatly organized in a partitioned Parquet dataset, we need an API to query it. This Python service, built with FastAPI for its performance and automatic documentation, will be the bridge between our data lake and the frontend. It will not use a traditional database. Instead, it will leverage pyarrow‘s dataset capabilities to read directly from Spaces, applying filters at the storage layer whenever possible.

query_api/main.py:

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from datetime import date
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow.fs as fs
import os

app = FastAPI()

# Configuration should be loaded from environment variables
S3_FILESYSTEM = fs.S3FileSystem(
    endpoint_override=os.environ.get('SPACES_ENDPOINT_URL'),
    access_key=os.environ.get('SPACES_ACCESS_KEY'),
    secret_key=os.environ.get('SPACES_SECRET_KEY'),
    region='nyc3'
)
PROCESSED_BUCKET = os.environ.get('PROCESSED_BUCKET', "company-telemetry-processed")

class QueryResponse(BaseModel):
    data: List[Dict[str, Any]]
    count: int
    message: Optional[str] = None

@app.get("/query", response_model=QueryResponse)
async def perform_query(
    start_date: date,
    end_date: date,
    event_type: str = Query(..., description="The type of event to query, e.g., 'frontend_error'"),
    user_id: Optional[int] = None,
    limit: int = Query(100, ge=1, le=1000)
):
    """
    Queries the Parquet data lake.
    """
    filters = []
    
    # --- Filter Pushdown ---
    # This is the most important optimization. By specifying filters here,
    # pyarrow can leverage the partitioning scheme. It will only look inside
    # directories matching event_type and the date range.
    
    # 1. Partition filter for event_type
    filters.append(ds.field('event_type') == event_type)
    
    # 2. Partition filter for date range
    # A pitfall is creating a filter for every day. It's more efficient
    # to create a range filter if the underlying library supports it.
    filters.append(ds.field('date') >= start_date)
    filters.append(ds.field('date') <= end_date)

    # 3. Columnar filter for other fields
    if user_id is not None:
        filters.append(ds.field('user_id') == user_id)
        
    combined_filter = pa.compute.and_(*filters)

    try:
        dataset = ds.dataset(
            f"s3://{PROCESSED_BUCKET}",
            filesystem=S3_FILESYSTEM,
            format="parquet",
            partitioning=["event_type", "date"]
        )

        scanner = dataset.scanner(filter=combined_filter, batch_size=100_000)
        
        # The query is executed here. We convert to a Pandas DataFrame for easy serialization.
        table = scanner.to_table()
        df = table.to_pandas()
        
        # Sort and limit results before sending to the client
        df_sorted = df.sort_values(by='_server_timestamp_utc', ascending=False)
        result_df = df_sorted.head(limit)
        
        # Convert NaT/NaN to None for valid JSON
        result_df = result_df.where(pd.notnull(result_df), None)

        return {
            "data": result_df.to_dict('records'),
            "count": len(result_df)
        }

    except Exception as e:
        # In production, log the full exception
        raise HTTPException(status_code=500, detail=f"Error querying data lake: {str(e)}")

The key piece of code is ds.dataset with the partitioning argument. This tells PyArrow about our directory structure. When we build a filter on event_type or date, PyArrow is smart enough to translate that into a list of specific directories to scan, dramatically pruning the search space. This is filter pushdown in action, and it’s what makes this “database-less” approach viable.

Stage 4: The Webpack Frontend

The final piece is the user interface. We need a simple dashboard page where developers can select a date range, event type, and other filters to view the telemetry data. This is built as a new route within our existing React application, bundled by Webpack.

For production builds, the Webpack configuration is tuned for performance.

webpack.prod.config.js (excerpt):

// ... other configurations
module.exports = {
  mode: 'production',
  entry: {
    main: './src/index.js',
    observability: './src/observability/index.js', // Separate entry for the dashboard
  },
  output: {
    filename: '[name].[contenthash].js',
    path: path.resolve(__dirname, 'dist'),
    clean: true,
  },
  optimization: {
    // This splits vendor code (react, etc.) from our app code,
    // improving caching for repeat visitors.
    splitChunks: {
      chunks: 'all',
    },
    // Minimizes JS and CSS
    minimizer: [new TerserPlugin(), new CssMinimizerPlugin()],
  },
  // ... loaders for babel, css, etc.
};

The critical part is creating a separate entry point (observability) and using splitChunks. This ensures that users visiting the main application don’t have to download the code for the observability dashboard, which they might rarely use.

A React component for the dashboard might look like this:

src/observability/Dashboard.jsx:

import React, { useState, useEffect, useCallback } from 'react';

// A real implementation would use a more robust data grid component
function ResultsTable({ data }) {
  if (!data || data.length === 0) {
    return <p>No results found.</p>;
  }
  const headers = Object.keys(data[0]);
  return (
    <table>
      <thead>
        <tr>
          {headers.map(h => <th key={h}>{h}</th>)}
        </tr>
      </thead>
      <tbody>
        {data.map((row, i) => (
          <tr key={i}>
            {headers.map(h => <td key={h}>{JSON.stringify(row[h])}</td>)}
          </tr>
        ))}
      </tbody>
    </table>
  );
}

export function ObservabilityDashboard() {
  const [results, setResults] = useState(null);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState('');
  
  // Form state
  const [startDate, setStartDate] = useState('2023-10-26');
  const [endDate, setEndDate] = useState('2023-10-27');
  const [eventType, setEventType] = useState('frontend_error');

  const fetchData = useCallback(async () => {
    setLoading(true);
    setError('');
    const params = new URLSearchParams({
      start_date: startDate,
      end_date: endDate,
      event_type: eventType,
      limit: 100
    });

    try {
      // The API endpoint would be proxied in development via webpack.devServer.js
      const response = await fetch(`/api/query?${params.toString()}`);
      if (!response.ok) {
        throw new Error(`API Error: ${response.statusText}`);
      }
      const data = await response.json();
      setResults(data.data);
    } catch (err) {
      setError(err.message);
      setResults(null);
    } finally {
      setLoading(false);
    }
  }, [startDate, endDate, eventType]);

  return (
    <div>
      <h1>Telemetry Query Interface</h1>
      {/* Form controls for start_date, end_date, event_type etc. would go here */}
      <button onClick={fetchData} disabled={loading}>
        {loading ? 'Querying...' : 'Run Query'}
      </button>

      {error && <div style={{ color: 'red' }}>{error}</div>}
      
      {results && <ResultsTable data={results} />}
    </div>
  );
}

This completes the loop. A developer can now load this page, set filters, and get back structured telemetry data, all powered by our budget-friendly data lake on DigitalOcean.

The primary limitation of this architecture is its near-line nature, not real-time. There is an inherent delay dictated by the ETL job’s schedule. For incident response, an hour-long delay might be too long. Furthermore, the query performance, while vastly superior to scanning JSON, is still dependent on a single API instance downloading and processing data from Spaces. It cannot compete with distributed query engines like Presto or ClickHouse for complex analytical queries over massive datasets. Future iterations could explore using DuckDB within the Python API for more powerful, in-process SQL queries on the Parquet files or introducing a stream processing element with a tool like Kafka if real-time needs become paramount. The system’s applicability boundary lies where query latency in minutes is unacceptable or when ad-hoc analytical queries become more important than simple filtered lookups.


  TOC