The cost of third-party observability platforms became unsustainable. With a growing number of Python microservices and a complex single-page application built with Webpack, all running on DigitalOcean Droplets, the volume of telemetry data—frontend errors, API performance metrics, and backend application logs—was exploding. We were flying blind, reverting to ssh
and grep
during incidents, a process that was neither scalable nor efficient. The immediate technical pain point was the lack of a centralized, queryable system for our operational data that didn’t require a six-figure annual budget.
Our initial concept was a barebones log aggregation pipeline. The plan was to funnel all telemetry as JSON blobs to a single endpoint and dump them into DigitalOcean Spaces. This seemed simple enough, but the real challenge emerged when we considered the query requirements. How could a developer efficiently find all frontend errors for a specific user ID from the last 72 hours without downloading and parsing terabytes of gzipped JSON? This question forced a significant evolution of the architecture. A simple log dump wasn’t enough; we needed a pragmatic, low-cost data lake.
The technology selection was driven by our existing ecosystem and a mandate for cost-effectiveness.
- Infrastructure: DigitalOcean was the given. We leveraged Droplets for services, App Platform for the query API, Functions for serverless ingestion, and Spaces for object storage. Spaces is S3-compatible, which was a critical factor for library support.
- Data Processing & APIs: Python was the natural choice. Our backend team was proficient in it, and the data ecosystem with libraries like
pandas
,pyarrow
, andfastapi
is unparalleled. It provided the tools to handle everything from data ingestion and transformation to serving query results. - Data Lake Format: Storing raw JSON was discarded due to abysmal query performance. We opted for Apache Parquet. Its columnar storage format means a query for
user_id
anderror_message
only reads data for those columns, drastically reducing I/O compared to row-based formats like JSON or CSV. Furthermore, Parquet’s support for partitioning would allow us to physically separate data on disk by date and event type, making time-series and categorical queries extremely fast. - Frontend Interface: Our existing SPA was built and bundled with Webpack. Creating a new, lightweight dashboard page within the same project was the path of least resistance. We could reuse existing components and authentication logic.
The architecture crystallized into four distinct stages: serverless ingestion, batch ETL, a query API, and a frontend client.
graph TD subgraph Clients A[Frontend App] --> B C[Python Service 1] --> B D[Python Service 2] --> B end subgraph Ingestion [Stage 1: Serverless Ingestion] B(DigitalOcean Function) -- JSON Payload --> E{DO Spaces: Raw Bucket} end subgraph ETL [Stage 2: Batch ETL] F(Scheduled Python Job on Droplet) -- Reads Raw JSON --> E F -- Writes Partitioned Parquet --> G{DO Spaces: Processed Bucket} end subgraph Querying [Stage 3 & 4: Query & Visualization] G -- Data Source --> H(Python Query API) H -- REST API --> I(Webpack SPA Dashboard) I -- User Queries --> H end style Ingestion fill:#f9f,stroke:#333,stroke-width:2px style ETL fill:#ccf,stroke:#333,stroke-width:2px style Querying fill:#cfc,stroke:#333,stroke-width:2px
Stage 1: The Firehose Ingestion Endpoint
The first component is a serverless function on DigitalOcean that acts as a simple, highly available ingestion endpoint. Its only job is to accept POST requests with JSON payloads, add a server-side timestamp, and write the data to a “raw” bucket in DigitalOcean Spaces. A key design decision here is to avoid any processing or validation at this stage. The goal is to capture data as quickly and reliably as possible.
A real-world project requires robust configuration management, not hardcoded strings. We manage this through environment variables, which DigitalOcean Functions supports natively.
project.yml
:
packages:
- name: telemetry-ingest
actions:
- name: ingest
runtime: python:3.9
main: main
environment:
SPACES_ENDPOINT_URL: ${SPACES_ENDPOINT_URL}
SPACES_ACCESS_KEY: ${SPACES_ACCESS_KEY}
SPACES_SECRET_KEY: ${SPACES_SECRET_KEY}
RAW_DATA_BUCKET: "company-telemetry-raw"
The Python function itself uses boto3
, the AWS SDK for Python, which works seamlessly with S3-compatible services like Spaces.
packages/telemetry-ingest/ingest/main.py
:
import os
import json
import uuid
import boto3
from botocore.client import Config
from datetime import datetime
import logging
# Basic logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# A common mistake is to create a new client for every invocation.
# In a serverless environment, global-level initialization allows for connection reuse
# across invocations within the same container instance.
session = boto3.session.Session()
client = session.client('s3',
region_name='nyc3',
endpoint_url=os.environ.get('SPACES_ENDPOINT_URL'),
aws_access_key_id=os.environ.get('SPACES_ACCESS_KEY'),
aws_secret_access_key=os.environ.get('SPACES_SECRET_KEY'),
config=Config(s3={'addressing_style': 'virtual'}))
RAW_DATA_BUCKET = os.environ.get('RAW_DATA_BUCKET')
def main(args):
"""
DigitalOcean Function entry point.
Accepts a JSON payload and writes it to a timestamped path in Spaces.
"""
try:
# The body is passed as a string in the args dictionary
payload = args.get('__ow_body', '{}')
if not isinstance(payload, str):
payload = json.dumps(payload)
data = json.loads(payload)
# Enrich with server-side metadata
server_timestamp = datetime.utcnow()
data['_server_timestamp_utc'] = server_timestamp.isoformat()
data['_ingest_id'] = str(uuid.uuid4())
# The object key structure is critical for organization and avoiding "hot spots".
# YYYY/MM/DD/HH partitioning is a standard practice for time-series data.
object_key = (
f"{server_timestamp.strftime('%Y/%m/%d/%H')}/"
f"{server_timestamp.isoformat()}_{uuid.uuid4()}.json"
)
client.put_object(
Bucket=RAW_DATA_BUCKET,
Key=object_key,
Body=json.dumps(data).encode('utf-8'),
ContentType='application/json'
)
return {
"statusCode": 202,
"headers": { 'Content-Type': 'application/json' },
"body": { 'status': 'accepted', 'key': object_key }
}
except json.JSONDecodeError:
logger.error("Failed to decode JSON payload")
return { "statusCode": 400, "body": { "error": "Invalid JSON payload" } }
except Exception as e:
# In a real-world project, you'd have more specific exception handling.
# This catch-all prevents the function from crashing and provides some insight.
logger.exception(f"An unexpected error occurred: {e}")
return { "statusCode": 500, "body": { "error": "Internal server error during ingestion" } }
This approach is simple and horizontally scalable. The main pitfall to avoid is performing synchronous, heavy processing here. Any delay increases the chance of timeouts and data loss. The sole responsibility is durable persistence.
Stage 2: The Parquet ETL Converter
This is the core of the data lake transformation. A scheduled Python script runs periodically (e.g., hourly via a cron job on a utility Droplet) to convert the raw, schemaless JSON files into structured, partitioned Parquet files.
This job faces a few challenges:
- State Management: It needs to know which JSON files it has already processed to avoid duplication.
- Schema Enforcement: Telemetry data can be messy. The job must handle missing keys, incorrect data types, and evolve as new fields are added.
- Efficiency: It must process potentially thousands of small JSON files without consuming excessive memory or time.
Here’s a simplified but production-grade implementation of the ETL job.
etl_runner/config.py
:
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
SPACES_ENDPOINT_URL = os.environ.get('SPACES_ENDPOINT_URL')
SPACES_ACCESS_KEY = os.environ.get('SPACES_ACCESS_KEY')
SPACES_SECRET_KEY = os.environ.get('SPACES_SECRET_KEY')
RAW_BUCKET = "company-telemetry-raw"
PROCESSED_BUCKET = "company-telemetry-processed"
# In a production system, this would be a persistent store like Redis or a database file.
# For simplicity, we use a local file to track processed files.
STATE_FILE_PATH = "./processed_files.log"
settings = Settings()
etl_runner/process.py
:
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from botocore.client import Config
from datetime import datetime, timedelta
import logging
import io
from config import settings
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ETLProcessor:
def __init__(self):
self.s3_client = self._get_s3_client()
self.processed_files = self._load_state()
def _get_s3_client(self):
session = boto3.session.Session()
return session.client('s3',
region_name='nyc3',
endpoint_url=settings.SPACES_ENDPOINT_URL,
aws_access_key_id=settings.SPACES_ACCESS_KEY,
aws_secret_access_key=settings.SPACES_SECRET_KEY,
config=Config(s3={'addressing_style': 'virtual'}))
def _load_state(self):
try:
with open(settings.STATE_FILE_PATH, 'r') as f:
return set(line.strip() for line in f)
except FileNotFoundError:
return set()
def _save_state(self):
with open(settings.STATE_FILE_PATH, 'w') as f:
for file_key in sorted(list(self.processed_files)):
f.write(f"{file_key}\n")
def list_new_files(self, prefix):
paginator = self.s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=settings.RAW_BUCKET, Prefix=prefix)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
if obj['Key'] not in self.processed_files:
yield obj['Key']
def process_batch(self, file_keys):
records = []
processed_in_batch = []
for key in file_keys:
try:
response = self.s3_client.get_object(Bucket=settings.RAW_BUCKET, Key=key)
content = response['Body'].read().decode('utf-8')
data = json.loads(content)
records.append(data)
processed_in_batch.append(key)
except Exception as e:
logging.error(f"Failed to process file {key}: {e}")
# In production, move failed files to a 'dead-letter' queue/prefix for inspection.
if not records:
return []
df = pd.DataFrame(records)
# --- Schema Enforcement and Typing ---
# This section is critical and often becomes complex.
# Here we ensure required columns exist and cast types.
df['event_timestamp'] = pd.to_datetime(df.get('timestamp_utc', pd.NaT))
df['event_type'] = df.get('event_type', 'unknown').astype('category')
df['user_id'] = df.get('user_id', -1).astype(int)
# Extract date for partitioning
df['date'] = df['event_timestamp'].dt.date
return df, processed_in_batch
def write_to_parquet(self, df):
if df.empty:
return
# Convert to Arrow Table for efficient writing
table = pa.Table.from_pandas(df, preserve_index=False)
# This is where the magic happens. We write the data to the processed bucket,
# partitioned by 'event_type' and 'date'. This creates a directory structure
# like: s3://processed-bucket/event_type=api_error/date=2023-10-27/file.parquet
pq.write_to_dataset(
table,
root_path=f"s3://{settings.PROCESSED_BUCKET}",
partition_cols=['event_type', 'date'],
filesystem=pa.fs.S3FileSystem(
endpoint_override=settings.SPACES_ENDPOINT_URL,
access_key=settings.SPACES_ACCESS_KEY,
secret_key=settings.SPACES_SECRET_KEY,
region='nyc3'
)
)
logging.info(f"Wrote {len(df)} records to partitioned Parquet dataset.")
def run(self):
# Process data from the last 2 hours to handle any ingestion delays.
now = datetime.utcnow()
prefixes_to_scan = [
(now - timedelta(hours=i)).strftime('%Y/%m/%d/%H') for i in range(2)
]
all_new_files = []
for prefix in prefixes_to_scan:
logging.info(f"Scanning for new files in prefix: {prefix}")
all_new_files.extend(list(self.list_new_files(prefix)))
if not all_new_files:
logging.info("No new files to process.")
return
logging.info(f"Found {len(all_new_files)} new files to process.")
# Process in chunks to manage memory
chunk_size = 1000
for i in range(0, len(all_new_files), chunk_size):
batch_keys = all_new_files[i:i + chunk_size]
df, processed_keys = self.process_batch(batch_keys)
if not df.empty:
self.write_to_parquet(df)
self.processed_files.update(processed_keys)
self._save_state()
logging.info("ETL run completed.")
if __name__ == "__main__":
processor = ETLProcessor()
processor.run()
A common mistake is to read all JSON files into memory at once. This works for small volumes but quickly fails. Processing in batches and using generators is key to memory efficiency. The state management here is file-based for simplicity, but a more robust solution might use a small SQLite database or a key-value store.
Stage 3: The Query API
With the data neatly organized in a partitioned Parquet dataset, we need an API to query it. This Python service, built with FastAPI for its performance and automatic documentation, will be the bridge between our data lake and the frontend. It will not use a traditional database. Instead, it will leverage pyarrow
‘s dataset capabilities to read directly from Spaces, applying filters at the storage layer whenever possible.
query_api/main.py
:
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from datetime import date
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow.fs as fs
import os
app = FastAPI()
# Configuration should be loaded from environment variables
S3_FILESYSTEM = fs.S3FileSystem(
endpoint_override=os.environ.get('SPACES_ENDPOINT_URL'),
access_key=os.environ.get('SPACES_ACCESS_KEY'),
secret_key=os.environ.get('SPACES_SECRET_KEY'),
region='nyc3'
)
PROCESSED_BUCKET = os.environ.get('PROCESSED_BUCKET', "company-telemetry-processed")
class QueryResponse(BaseModel):
data: List[Dict[str, Any]]
count: int
message: Optional[str] = None
@app.get("/query", response_model=QueryResponse)
async def perform_query(
start_date: date,
end_date: date,
event_type: str = Query(..., description="The type of event to query, e.g., 'frontend_error'"),
user_id: Optional[int] = None,
limit: int = Query(100, ge=1, le=1000)
):
"""
Queries the Parquet data lake.
"""
filters = []
# --- Filter Pushdown ---
# This is the most important optimization. By specifying filters here,
# pyarrow can leverage the partitioning scheme. It will only look inside
# directories matching event_type and the date range.
# 1. Partition filter for event_type
filters.append(ds.field('event_type') == event_type)
# 2. Partition filter for date range
# A pitfall is creating a filter for every day. It's more efficient
# to create a range filter if the underlying library supports it.
filters.append(ds.field('date') >= start_date)
filters.append(ds.field('date') <= end_date)
# 3. Columnar filter for other fields
if user_id is not None:
filters.append(ds.field('user_id') == user_id)
combined_filter = pa.compute.and_(*filters)
try:
dataset = ds.dataset(
f"s3://{PROCESSED_BUCKET}",
filesystem=S3_FILESYSTEM,
format="parquet",
partitioning=["event_type", "date"]
)
scanner = dataset.scanner(filter=combined_filter, batch_size=100_000)
# The query is executed here. We convert to a Pandas DataFrame for easy serialization.
table = scanner.to_table()
df = table.to_pandas()
# Sort and limit results before sending to the client
df_sorted = df.sort_values(by='_server_timestamp_utc', ascending=False)
result_df = df_sorted.head(limit)
# Convert NaT/NaN to None for valid JSON
result_df = result_df.where(pd.notnull(result_df), None)
return {
"data": result_df.to_dict('records'),
"count": len(result_df)
}
except Exception as e:
# In production, log the full exception
raise HTTPException(status_code=500, detail=f"Error querying data lake: {str(e)}")
The key piece of code is ds.dataset
with the partitioning
argument. This tells PyArrow about our directory structure. When we build a filter on event_type
or date
, PyArrow is smart enough to translate that into a list of specific directories to scan, dramatically pruning the search space. This is filter pushdown in action, and it’s what makes this “database-less” approach viable.
Stage 4: The Webpack Frontend
The final piece is the user interface. We need a simple dashboard page where developers can select a date range, event type, and other filters to view the telemetry data. This is built as a new route within our existing React application, bundled by Webpack.
For production builds, the Webpack configuration is tuned for performance.
webpack.prod.config.js
(excerpt):
// ... other configurations
module.exports = {
mode: 'production',
entry: {
main: './src/index.js',
observability: './src/observability/index.js', // Separate entry for the dashboard
},
output: {
filename: '[name].[contenthash].js',
path: path.resolve(__dirname, 'dist'),
clean: true,
},
optimization: {
// This splits vendor code (react, etc.) from our app code,
// improving caching for repeat visitors.
splitChunks: {
chunks: 'all',
},
// Minimizes JS and CSS
minimizer: [new TerserPlugin(), new CssMinimizerPlugin()],
},
// ... loaders for babel, css, etc.
};
The critical part is creating a separate entry point (observability
) and using splitChunks
. This ensures that users visiting the main application don’t have to download the code for the observability dashboard, which they might rarely use.
A React component for the dashboard might look like this:
src/observability/Dashboard.jsx
:
import React, { useState, useEffect, useCallback } from 'react';
// A real implementation would use a more robust data grid component
function ResultsTable({ data }) {
if (!data || data.length === 0) {
return <p>No results found.</p>;
}
const headers = Object.keys(data[0]);
return (
<table>
<thead>
<tr>
{headers.map(h => <th key={h}>{h}</th>)}
</tr>
</thead>
<tbody>
{data.map((row, i) => (
<tr key={i}>
{headers.map(h => <td key={h}>{JSON.stringify(row[h])}</td>)}
</tr>
))}
</tbody>
</table>
);
}
export function ObservabilityDashboard() {
const [results, setResults] = useState(null);
const [loading, setLoading] = useState(false);
const [error, setError] = useState('');
// Form state
const [startDate, setStartDate] = useState('2023-10-26');
const [endDate, setEndDate] = useState('2023-10-27');
const [eventType, setEventType] = useState('frontend_error');
const fetchData = useCallback(async () => {
setLoading(true);
setError('');
const params = new URLSearchParams({
start_date: startDate,
end_date: endDate,
event_type: eventType,
limit: 100
});
try {
// The API endpoint would be proxied in development via webpack.devServer.js
const response = await fetch(`/api/query?${params.toString()}`);
if (!response.ok) {
throw new Error(`API Error: ${response.statusText}`);
}
const data = await response.json();
setResults(data.data);
} catch (err) {
setError(err.message);
setResults(null);
} finally {
setLoading(false);
}
}, [startDate, endDate, eventType]);
return (
<div>
<h1>Telemetry Query Interface</h1>
{/* Form controls for start_date, end_date, event_type etc. would go here */}
<button onClick={fetchData} disabled={loading}>
{loading ? 'Querying...' : 'Run Query'}
</button>
{error && <div style={{ color: 'red' }}>{error}</div>}
{results && <ResultsTable data={results} />}
</div>
);
}
This completes the loop. A developer can now load this page, set filters, and get back structured telemetry data, all powered by our budget-friendly data lake on DigitalOcean.
The primary limitation of this architecture is its near-line nature, not real-time. There is an inherent delay dictated by the ETL job’s schedule. For incident response, an hour-long delay might be too long. Furthermore, the query performance, while vastly superior to scanning JSON, is still dependent on a single API instance downloading and processing data from Spaces. It cannot compete with distributed query engines like Presto or ClickHouse for complex analytical queries over massive datasets. Future iterations could explore using DuckDB within the Python API for more powerful, in-process SQL queries on the Parquet files or introducing a stream processing element with a tool like Kafka if real-time needs become paramount. The system’s applicability boundary lies where query latency in minutes is unacceptable or when ad-hoc analytical queries become more important than simple filtered lookups.