The initial problem wasn’t a technical marvel; it was a bog-standard operational headache. Our data ingestion pipeline was a collection of disparate Python scripts, cron jobs, and manual interventions. It was fragile, impossible to monitor, and bled Snowflake compute credits by keeping a warehouse active far longer than necessary. Each new data source required bespoke code, and failures were silent, often discovered days later during a business review. We needed a centralized, observable, and cost-efficient ELT orchestrator. Our team’s expertise was in Node.js and TypeScript, and our methodology was Scrum. This is the post-mortem of how we built that system, not in one grand design, but through a series of iterative sprints that uncovered the real-world complexities of the task.
Sprint 1: The Spike - A Proof of Viability
Our first sprint goal was simple: prove that Node.js could reliably connect to Snowflake and execute a basic COPY INTO
command to load a single file from an S3 stage. This was a technical spike, intended to de-risk our choice of technology. In a real-world project, you start with the core transaction and build outwards.
The initial piece of code was straightforward, using the official snowflake-sdk
.
// src/services/snowflake-connector.ts
import snowflake from 'snowflake-sdk';
import { promisify } from 'util';
// A common mistake is to create new connections for every query.
// A production system needs a robust connection pool. For this spike,
// a singleton connection is sufficient to prove the concept.
let connection: snowflake.Connection;
export function connectToSnowflake(): Promise<snowflake.Connection> {
if (connection && !connection.isUp()) {
// Handle unexpected disconnections
console.warn('Snowflake connection is down, attempting to reconnect.');
connection = null; // Force re-creation
}
if (!connection) {
connection = snowflake.createConnection({
account: process.env.SNOWFLAKE_ACCOUNT!,
username: process.env.SNOWFLAKE_USER!,
password: process.env.SNOWFLAKE_PASSWORD!,
warehouse: process.env.SNOWFLAKE_WAREHOUSE!,
database: process.env.SNOWFLAKE_DATABASE!,
schema: process.env.SNOWFLAKE_SCHEMA!,
});
const connectAsync = promisify(connection.connect).bind(connection);
return connectAsync()
.then(() => {
console.log('Successfully connected to Snowflake.');
return connection;
})
.catch(err => {
console.error('Error connecting to Snowflake:', err.message);
throw err;
});
}
return Promise.resolve(connection);
}
export async function executeSql(sqlText: string, binds: any[] = []): Promise<any[]> {
const conn = await connectToSnowflake();
return new Promise((resolve, reject) => {
conn.execute({
sqlText,
binds,
complete: (err, stmt, rows) => {
if (err) {
// The pitfall here is that the error object is verbose.
// Logging just err.message is often more useful initially.
return reject(new Error(`Failed to execute statement: ${err.message}`));
}
resolve(rows || []);
},
});
});
}
With the connection logic in place, the first attempt at a loading script was a naive loop.
// Initial POC - Do not use in production
import { executeSql } from './services/snowflake-connector';
const filesToLoad = [
's3://my-data-bucket/raw/events/2023/10/26/file1.json.gz',
's3://my-data-bucket/raw/events/2023/10/26/file2.json.gz',
];
async function loadFilesNaively() {
console.log('Starting naive file load...');
for (const filePath of filesToLoad) {
const sql = `
COPY INTO raw_events
FROM '@my_s3_stage/${filePath.split('raw/')[1]}'
FILE_FORMAT = (TYPE = 'JSON')
ON_ERROR = 'SKIP_FILE';
`;
try {
console.log(`Loading file: ${filePath}`);
await executeSql(sql);
console.log(`Successfully loaded ${filePath}`);
} catch (error) {
console.error(`Failed to load ${filePath}:`, error);
}
}
console.log('Naive file load finished.');
}
Sprint Review Outcome: Success, but with glaring issues. It worked, proving the Node.js stack was viable. However, the sequential processing was unacceptably slow. Loading a hundred files would take a hundred times longer than loading one. The immediate backlog item for the next sprint was clear: introduce concurrency.
Sprint 2: Concurrency, Backpressure, and the Promise.all
Trap
The obvious solution to the sequential bottleneck was Promise.all
. This is a classic junior developer mistake. In our retro, we called this the “optimism-driven development” phase.
// Second attempt - This will cause problems
async function loadFilesWithUncontrolledConcurrency() {
console.log('Starting uncontrolled concurrent load...');
const loadPromises = filesToLoad.map(filePath => {
const sql = `COPY INTO raw_events FROM '@my_s3_stage/${filePath.split('raw/')[1]}' ...`;
return executeSql(sql).catch(err => console.error(`Failed ${filePath}`, err));
});
await Promise.all(loadPromises);
console.log('Uncontrolled load finished.');
}
Running this against a hundred files immediately resulted in a flood of 429: Too Many Requests
errors from Snowflake, or worse, queries queuing up behind the scenes, leading to unpredictable load times and warehouse contention. The system had no backpressure.
The pragmatic solution was to build a simple task runner that would process jobs from a queue with a fixed concurrency limit. This prevents overwhelming the downstream service and provides more predictable performance.
// src/utils/task-runner.ts
import { EventEmitter } from 'events';
type AsyncTask<T> = () => Promise<T>;
export class TaskRunner<T> extends EventEmitter {
private queue: AsyncTask<T>[] = [];
private activeTasks = 0;
constructor(private concurrency: number) {
super();
if (concurrency < 1) {
throw new Error('Concurrency must be at least 1.');
}
}
public add(task: AsyncTask<T>): void {
this.queue.push(task);
this.runNext();
}
private runNext(): void {
// This is the core backpressure mechanism. Only run if a worker is free.
if (this.activeTasks >= this.concurrency || this.queue.length === 0) {
return;
}
this.activeTasks++;
const task = this.queue.shift()!;
task()
.then(result => this.emit('taskCompleted', result))
.catch(error => this.emit('taskFailed', error))
.finally(() => {
this.activeTasks--;
// After a task finishes, check if more work needs to be done.
this.runNext();
// If the queue is empty and all tasks are done, we are finished.
if (this.activeTasks === 0 && this.queue.length === 0) {
this.emit('drained');
}
});
}
public onDrained(): Promise<void> {
return new Promise((resolve) => {
if (this.activeTasks === 0 && this.queue.length === 0) {
return resolve();
}
this.once('drained', () => resolve());
});
}
}
Sprint Review Outcome: A major step forward in stability. We could now process thousands of files without crashing the system. We established a concurrency limit of 8, which seemed to be the sweet spot for our X-SMALL
warehouse. The next obvious problem was that all our logic—file paths, table names—was hardcoded. The system was stable but completely inflexible.
Sprint 3 & 4: Metadata-Driven Orchestration and Structured Logging
To make the orchestrator useful, we needed to externalize the job definitions. This led to a metadata-driven design. A simple JSON file would define our ELT “jobs.”
// config/jobs.json
[
{
"jobName": "IngestApiEvents",
"source": {
"type": "S3",
"bucket": "my-data-bucket",
"prefix": "raw/events/",
"stageName": "my_s3_stage"
},
"target": {
"table": "raw_events",
"schema": "RAW_DATA"
},
"loadOptions": {
"fileFormat": "TYPE = 'JSON'",
"copyOptions": "ON_ERROR = 'CONTINUE' PURGE = true"
},
"enabled": true
}
]
The orchestrator’s main loop now reads this configuration, lists matching files from S3, and dynamically generates the COPY
commands. As this complexity grew, console.log
became untenable. We introduced pino
for structured, high-performance JSON logging.
// src/services/logger.ts
import pino from 'pino';
// In a real project, you'd configure transports to send logs
// to a service like Datadog, Splunk, or CloudWatch.
export const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label }),
},
// A correlation ID is essential for tracing a single job run
// through a sea of logs.
mixin() {
return { correlationId: getCorrelationId() }; // Implementation of correlation ID omitted
},
timestamp: pino.stdTimeFunctions.isoTime,
});
The main orchestration logic started to take shape, combining configuration, task running, and logging.
// src/orchestrator.ts
import { TaskRunner } from './utils/task-runner';
import { executeSql } from './services/snowflake-connector';
import { listS3Objects } from './services/s3-client'; // Assumes an S3 client exists
import { logger } from './services/logger';
import jobsConfig from './config/jobs.json';
interface Job {
jobName: string;
// ... other properties from JSON
}
async function runJob(job: Job) {
const jobLogger = logger.child({ jobName: job.jobName });
jobLogger.info('Starting job execution.');
const files = await listS3Objects(job.source.bucket, job.source.prefix);
if (files.length === 0) {
jobLogger.info('No new files found to process.');
return;
}
jobLogger.info({ fileCount: files.length }, 'Discovered files for processing.');
const taskRunner = new TaskRunner<any>(8);
let successCount = 0;
let errorCount = 0;
taskRunner.on('taskCompleted', (result) => {
successCount++;
jobLogger.debug({ result }, 'File loaded successfully.');
});
taskRunner.on('taskFailed', (error) => {
errorCount++;
jobLogger.error({ error: error.message }, 'File loading failed.');
});
for (const fileKey of files) {
taskRunner.add(() => {
// This is a closure, capturing the fileKey for the task.
const relativePath = fileKey.replace(job.source.prefix, '');
const sql = `
COPY INTO ${job.target.schema}.${job.target.table}
FROM '@${job.source.stageName}/${relativePath}'
FILE_FORMAT = (${job.loadOptions.fileFormat})
${job.loadOptions.copyOptions};
`;
jobLogger.trace({ sql }, 'Executing COPY command.');
return executeSql(sql);
});
}
await taskRunner.onDrained();
jobLogger.info(
{ successCount, errorCount },
'Job execution finished.'
);
}
export async function main() {
for (const job of jobsConfig) {
if (job.enabled) {
await runJob(job);
}
}
}
Sprint Review Outcome: The system was now flexible and observable. We could add new data sources just by modifying a JSON file. The structured logs, when piped into our analysis tool, gave us perfect visibility into which files were failing and why. But with increased usage came a new, more expensive problem: our Snowflake bill.
Sprint 5: Taming the Snowflake Bill with Programmatic Warehouse Management
We discovered our X-SMALL
warehouse was running 24/7, even though our orchestrator only ran every 15 minutes. The auto-suspend feature was set to 10 minutes, but the frequent job runs kept it “warm.” In a consumption-based model, this is just throwing money away.
The solution was to make the orchestrator responsible for the warehouse lifecycle. It would resume the warehouse at the start of its run and suspend it immediately upon completion.
// src/services/warehouse-manager.ts
import { executeSql } from './snowflake-connector';
import { logger } from './logger';
const WAREHOUSE_NAME = process.env.SNOWFLAKE_WAREHOUSE!;
// A state flag to prevent redundant RESUME calls.
let isResumed = false;
export async function ensureWarehouseActive(): Promise<void> {
if (isResumed) {
logger.debug('Warehouse already active.');
return;
}
logger.info({ warehouse: WAREHOUSE_NAME }, 'Resuming warehouse...');
const startTime = Date.now();
// Using `IF EXISTS` is a good defensive practice.
await executeSql(`ALTER WAREHOUSE IF EXISTS ${WAREHOUSE_NAME} RESUME;`);
const duration = Date.now() - startTime;
logger.info({ warehouse: WAREHOUSE_NAME, durationMs: duration }, 'Warehouse resumed.');
isResumed = true;
}
export async function suspendWarehouseIfIdle(): Promise<void> {
// In a more complex system, you might check for other active queries.
// For this orchestrator, we assume it's the sole user during its run.
logger.info({ warehouse: WAREHOUSE_NAME }, 'Suspending warehouse.');
await executeSql(`ALTER WAREHOUSE IF EXISTS ${WAREHOUSE_NAME} SUSPEND;`);
isResumed = false;
}
The main orchestrator logic was updated to wrap the entire run in these functions.
// src/orchestrator.ts (updated main function)
import { ensureWarehouseActive, suspendWarehouseIfIdle } from './services/warehouse-manager';
export async function main() {
try {
await ensureWarehouseActive();
for (const job of jobsConfig) {
if (job.enabled) {
await runJob(job);
}
}
} catch (error) {
logger.fatal({ error: error.message }, 'Orchestrator failed with a critical error.');
} finally {
// The `finally` block is crucial to ensure we always try to suspend
// the warehouse, even if the jobs fail.
await suspendWarehouseIfIdle();
}
}
Sprint Review Outcome: This was a massive financial win. Our Snowflake credit consumption for this workload dropped by over 70%. The trade-off was a small amount of latency (1-3 seconds) at the start of each run for the warehouse to resume. For our near-real-time requirements, this was perfectly acceptable. This is a classic engineering trade-off: sacrificing a small amount of performance for a huge cost saving.
Sprint 6 & 7: Idempotency, Dead-Lettering, and Testing
The system was cheap and fast, but not yet robust. A network failure during a COPY
command left us in an ambiguous state. Did the file load? If we re-ran the job, would it duplicate data? The PURGE = true
option in our COPY
command helped, as it would delete the file from S3 after a successful load, but it wasn’t foolproof.
We implemented a more robust idempotency mechanism using a metadata tracking table in Snowflake.
CREATE TABLE IF NOT EXISTS elt_logs.process_log (
log_id VARCHAR(64) PRIMARY KEY,
file_key VARCHAR NOT NULL,
job_name VARCHAR NOT NULL,
status VARCHAR(20) NOT NULL, -- e.g., 'PROCESSING', 'SUCCESS', 'FAILED'
rows_parsed BIGINT,
rows_loaded BIGINT,
error_message VARCHAR,
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Before processing a file, the orchestrator would now:
- Check the
process_log
table. If the file has aSUCCESS
status, skip it. - If the file is new or has a
FAILED
status, insert a record with aPROCESSING
status. - Execute the
COPY
command. - On success, update the log record to
SUCCESS
along with statistics from theCOPY
result. - On failure, update the log record to
FAILED
with the error message.
This required transactional control, which complicated our simple executeSql
function, but it was a necessary complexity for ensuring data integrity.
Finally, as the codebase grew, we introduced Jest
for testing. Unit testing the configuration parser and task runner was simple. The more interesting challenge was integration testing without hitting Snowflake. We used jest.mock
to mock our snowflake-connector
module.
// tests/orchestrator.test.ts
import { main as runOrchestrator } from '../src/orchestrator';
import * as s3Client from '../src/services/s3-client';
import * as snowflake from '../src/services/snowflake-connector';
// Mock the entire snowflake connector module
jest.mock('../src/services/snowflake-connector');
const mockExecuteSql = snowflake.executeSql as jest.Mock;
describe('Orchestrator', () => {
beforeEach(() => {
// Reset mocks before each test
mockExecuteSql.mockClear();
});
it('should generate correct COPY SQL based on job config', async () => {
// Mock S3 to return a single file
jest.spyOn(s3Client, 'listS3Objects').mockResolvedValueOnce([
'raw/events/file1.json.gz'
]);
// Mock SQL execution to resolve immediately
mockExecuteSql.mockResolvedValue([]);
await runOrchestrator();
// We expect 3 calls: RESUME, COPY, SUSPEND
expect(mockExecuteSql).toHaveBeenCalledTimes(3);
const copyCall = mockExecuteSql.mock.calls[1][0];
expect(copyCall).toContain('COPY INTO RAW_DATA.raw_events');
expect(copyCall).toContain("@my_s3_stage/file1.json.gz");
expect(copyCall).toContain("ON_ERROR = 'CONTINUE' PURGE = true");
});
});
This allowed us to verify our SQL generation logic and the warehouse management calls without incurring cost or network dependency, drastically speeding up our CI/CD pipeline.
Final Architecture
The system we ended up with was a reflection of our iterative journey. It was a metadata-driven, concurrent, observable, cost-aware, and robust ELT orchestrator.
graph TD subgraph AWS S3[S3 Bucket with raw files] end subgraph "Node.js Application (ECS/EC2)" Orchestrator[Orchestrator Process] Config[jobs.json] TaskRunner[Task Runner Concurrency=8] Logger[Pino Structured Logger] end subgraph Snowflake Warehouse[Virtual Warehouse] Stage[S3 Stage] LogTable[elt_logs.process_log] TargetTable[Target Table] end subgraph Observability LogSink[Log Aggregator e.g., Datadog] end S3 -- "1. listS3Objects()" --> Orchestrator Config -- "2. Reads Config" --> Orchestrator Orchestrator -- "3. ensureWarehouseActive()" --> Warehouse Orchestrator -- "4. Checks/Inserts" --> LogTable Orchestrator -- "5. Dispatches Tasks" --> TaskRunner TaskRunner -- "6. COPY INTO ... FROM @Stage" --> Warehouse Stage -- "Reads file" --> S3 Warehouse -- "7. Updates Status" --> LogTable Warehouse -- "Loads data" --> TargetTable Orchestrator -- "8. suspendWarehouseIfIdle()" --> Warehouse Logger -- "Streams JSON logs" --> LogSink
The iterative process dictated by Scrum was not just a management framework; it was an essential part of the technical discovery process. We didn’t know our biggest problem would be cost until we had a working, observable system. We didn’t realize the need for a robust idempotency layer until we experienced a real-world failure.
The current implementation still has limitations. It operates on a polling schedule, which could be converted to a more responsive event-driven architecture using S3 Event Notifications and an SQS queue. The static concurrency limit could be made dynamic, adjusting to the current load on the Snowflake warehouse. Secret management is handled via environment variables, which is functional but should be migrated to a dedicated service like AWS Secrets Manager for better security and rotation policies. The orchestrator itself is a single point of failure; containerizing it and deploying it to a managed service like ECS or Kubernetes with automated restarts would be the next step in its maturity.