The initial requirement seemed straightforward: map the ownership and directorship structures of a portfolio of private companies. The data exists in public registries, but it’s fragmented across dozens of poorly designed, JavaScript-heavy government portals. Simple HTTP requests with parsers like BeautifulSoup were immediately non-viable. The information we needed was rendered client-side, hidden behind button clicks, and paginated in dynamically loaded tables. This wasn’t a static scraping job; it was an automation and data modeling problem that demanded a more robust architecture.
Our first attempt involved a series of scripts, but it quickly devolved into a maintenance nightmare. A scraper for one jurisdiction would break when the site’s JavaScript framework was updated. There was no state management, leading to redundant scraping and hammering of the source servers. Most importantly, storing the extracted relationships in a relational database like PostgreSQL became a tangled mess of self-referencing foreign keys and recursive queries that performed abysmally as the network depth increased. We weren’t just storing entities; we were trying to capture a complex, multi-layered graph. The pain of writing and maintaining those recursive SQL CTEs was the final catalyst for a complete architectural rethink.
The core concept shifted from a collection of scripts to a resilient, stateful data ingestion pipeline. The system needed three distinct capabilities:
- Deep Web Interaction: A component capable of programmatically controlling a real web browser to handle any client-side logic thrown at it.
- Data Transformation and Modeling: A robust layer to cleanse the scraped data and map it onto a graph structure of nodes (Companies, People) and edges (Ownership, Directorships).
- Graph-Native Storage: A database optimized for storing and querying highly interconnected data, allowing for efficient traversal of complex ownership chains.
This led to a technology selection centered on a specific combination of tools. For browser automation, Puppeteer (via a Python port) was chosen over Selenium. Its tighter integration with the Chrome DevTools Protocol often results in faster and more stable execution, which is critical for a pipeline that will run thousands of tasks. Python, with its asyncio
library, was the natural choice for the orchestration layer, allowing us to manage concurrent browser instances efficiently. Finally, Neo4j was selected as the storage backend. Its property graph model is a one-to-one match for our problem domain, and its Cypher query language is purpose-built for the kind of path-finding queries we needed to run, such as identifying ultimate beneficial owners or detecting circular ownership structures.
The final architecture can be visualized as a multi-stage, asynchronous pipeline.
graph TD A[State Manager: Target Queue] --> B{Python Orchestrator}; B -- next_target --> C[Browser Pool Manager]; C -- get_browser --> D[Puppeteer Scraper Instance]; D -- raw_html/json --> E[Data Transformer]; E -- Nodes & Edges --> F[Neo4j Ingestion Client]; F -- Cypher Batch MERGE --> G[(Neo4j Database)]; D -- success/failure --> B; B -- update_status --> A; subgraph "Async Worker" D E F end
Part 1: The Resilient Scraper Module
In a real-world project, scrapers fail. They fail due to network timeouts, unexpected changes in website layout, or the appearance of CAPTCHAs. Building a resilient scraper means anticipating failure and handling it gracefully. We encapsulated the browser management logic into a dedicated class. The choice here is a Python port of Puppeteer; pyppeteer-fork
is a community-maintained option that works well.
# scraper/browser_manager.py
import asyncio
import logging
from typing import Optional
from pyppeteer_fork import launch, Browser
from pyppeteer_fork.errors import PyppeteerError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class BrowserManager:
"""Manages a singleton browser instance for the application."""
_browser: Optional[Browser] = None
_lock = asyncio.Lock()
@classmethod
async def get_browser(cls) -> Browser:
"""
Provides a singleton browser instance.
Launches a new browser if one doesn't exist.
"""
async with cls._lock:
if cls._browser is None or not cls._browser.isConnected():
logging.info("No active browser instance found. Launching a new one.")
try:
# In production, you'd configure this heavily.
# For example, to use a proxy, you would add:
# args=['--proxy-server=http://your-proxy-address:port']
# Headless=False is useful for debugging locally.
cls._browser = await launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--disable-gpu'
],
# autoClose=False is important to prevent the browser from closing
# when a single page context closes.
autoClose=False
)
logging.info("New browser instance launched successfully.")
except PyppeteerError as e:
logging.error(f"Failed to launch browser: {e}")
raise
return cls._browser
@classmethod
async def close_browser(cls):
"""Closes the singleton browser instance if it exists."""
async with cls._lock:
if cls._browser and cls._browser.isConnected():
logging.info("Closing browser instance.")
await cls._browser.close()
cls._browser = None
The pitfall here is managing the browser lifecycle. A common mistake is to launch a new browser for every single scraping task. This is incredibly inefficient due to the high startup overhead. A manager class that provides a persistent, singleton browser instance which can then open multiple pages (tabs) concurrently is far more performant. The asyncio.Lock
is crucial to prevent race conditions when multiple coroutines try to initialize the browser simultaneously.
The actual scraping logic needs robust error handling and retry mechanisms. We used the tenacity
library to wrap our core scraping functions with exponential backoff.
# scraper/company_scraper.py
import asyncio
import logging
from typing import Dict, Any, List
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pyppeteer_fork.errors import TimeoutError, NetworkError
from pydantic import BaseModel, Field
from scraper.browser_manager import BrowserManager
# Pydantic models provide data validation right at the source.
class ScrapedPerson(BaseModel):
name: str
position: str
class ScrapedSubsidiary(BaseModel):
name: str
ownership_percentage: float = Field(..., ge=0, le=100)
class ScrapedCompanyData(BaseModel):
company_name: str
registration_number: str
directors: List[ScrapedPerson]
subsidiaries: List[ScrapedSubsidiary]
RETRYABLE_ERRORS = (TimeoutError, NetworkError, ConnectionResetError)
class CompanyScraper:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(RETRYABLE_ERRORS),
reraise=True # Reraise the exception after the final attempt
)
async def scrape(self, url: str) -> ScrapedCompanyData:
"""
Scrapes a single company's data from a given URL.
This function is decorated with a retry mechanism.
"""
browser = await BrowserManager.get_browser()
page = None
try:
page = await browser.newPage()
# Set a realistic user agent
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
)
logging.info(f"Navigating to {url}...")
await page.goto(url, {'waitUntil': 'networkidle2', 'timeout': 30000})
# --- This is where the site-specific scraping logic resides ---
# The following is a conceptual example. A real implementation
# would have complex selectors and interaction logic.
await page.waitForSelector('#company-details', {'timeout': 10000})
# Example: Click a tab to reveal directors
await page.click('#directors-tab')
await page.waitForSelector('.director-info', {'timeout': 5000})
# Using page.evaluate to run JavaScript in the browser context
# is often more reliable than trying to parse with Python-side tools.
company_details = await page.evaluate('''() => {
const name = document.querySelector('h1.company-name').innerText.trim();
const regNum = document.querySelector('.reg-number').innerText.split(':')[1].trim();
return { name, regNum };
}''')
directors_data = await page.evaluate('''() => {
const directors = [];
document.querySelectorAll('.director-info').forEach(el => {
directors.push({
name: el.querySelector('.name').innerText,
position: el.querySelector('.position').innerText,
});
});
return directors;
}''')
# Imagine another tab for subsidiaries
await page.click('#subsidiaries-tab')
await page.waitForSelector('.subsidiary-row', {'timeout': 5000})
subsidiaries_data = await page.evaluate('''() => {
const subsidiaries = [];
document.querySelectorAll('.subsidiary-row').forEach(el => {
subsidiaries.push({
name: el.querySelector('.sub-name').innerText,
ownership_percentage: parseFloat(el.querySelector('.ownership').innerText),
});
});
return subsidiaries;
}''')
# --- End of site-specific logic ---
return ScrapedCompanyData(
company_name=company_details['name'],
registration_number=company_details['regNum'],
directors=[ScrapedPerson(**d) for d in directors_data],
subsidiaries=[ScrapedSubsidiary(**s) for s in subsidiaries_data]
)
except Exception as e:
logging.error(f"An error occurred while scraping {url}: {e}")
# Take a screenshot on error for debugging
if page:
await page.screenshot({'path': f'error_{url.replace("/", "_")}.png'})
raise
finally:
if page:
await page.close()
Part 2: Transformation and Idempotent Ingestion
Raw scraped data is messy. The transformation layer’s job is to convert this raw data into a clean, structured set of graph elements (nodes and properties, relationships and properties). A critical design decision here is to ensure the ingestion process is idempotent. If we run the same scraper twice, it must not create duplicate entities in our graph. This is achieved by using unique business identifiers (like a company registration number) and leveraging Neo4j’s MERGE
clause.
The GraphRepository
class abstracts all interactions with Neo4j.
# database/graph_repository.py
import logging
from neo4j import AsyncGraphDatabase, AsyncDriver
from typing import List, Dict, Any
from scraper.company_scraper import ScrapedCompanyData
class GraphRepository:
def __init__(self, uri, user, password):
self._driver: AsyncDriver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def close(self):
await self._driver.close()
async def add_company_structure(self, data: ScrapedCompanyData):
"""
Ingests a company's scraped data into Neo4j using an idempotent query.
This uses `MERGE` to avoid creating duplicate nodes and relationships.
"""
# A common mistake is to run one query per node/relationship. This is very slow.
# The correct approach is to batch operations into a single, parameterized query
# using UNWIND. This minimizes network round-trips and leverages the database's
# transaction engine much more efficiently.
# Prepare parameters for the query
params = {
"company_name": data.company_name,
"company_reg": data.registration_number,
"directors": [d.dict() for d in data.directors],
"subsidiaries": [s.dict() for s in data.subsidiaries]
}
# This Cypher query is the heart of the ingestion logic.
query = """
// 1. Merge the parent company node
MERGE (c:Company {registration_number: $company_reg})
ON CREATE SET c.name = $company_name, c.scraped_at = timestamp()
ON MATCH SET c.name = $company_name, c.last_updated = timestamp()
// 2. Unwind the list of directors and merge them as Person nodes
// and create DIRECTOR_OF relationships.
WITH c
UNWIND $directors AS director_data
MERGE (p:Person {name: director_data.name})
MERGE (p)-[r:DIRECTOR_OF]->(c)
ON CREATE SET r.position = director_data.position, r.created_at = timestamp()
ON MATCH SET r.position = director_data.position
// 3. Unwind the list of subsidiaries and merge them as Company nodes
// and create OWNS relationships.
WITH c
UNWIND $subsidiaries AS subsidiary_data
MERGE (s:Company {name: subsidiary_data.name})
MERGE (c)-[r:OWNS]->(s)
ON CREATE SET r.percentage = subsidiary_data.ownership_percentage, r.created_at = timestamp()
ON MATCH SET r.percentage = subsidiary_data.ownership_percentage
"""
try:
async with self._driver.session() as session:
await session.run(query, params)
logging.info(f"Successfully ingested data for company: {data.company_name}")
except Exception as e:
logging.error(f"Failed to write to Neo4j for company {data.company_name}: {e}")
raise
The UNWIND
clause is a game-changer for performance here. It takes a list from the parameters (e.g., $directors
) and iterates over it, executing the subsequent MERGE
clauses for each item within a single transaction. This is orders of magnitude faster than sending dozens of individual MERGE
statements from the Python client. The ON CREATE
and ON MATCH
sub-clauses allow us to control what happens when a node/relationship is first created versus when it’s simply found, making it easy to track timestamps for data freshness.
Part 3: Orchestration and State Management
With the scraping and ingestion components built, the final piece is the orchestrator that ties them together. It needs to manage a queue of targets to scrape, distribute work to a pool of concurrent workers, and update the status of each target. For this implementation, we’ll use a simple in-memory queue, but in a production system, this would be replaced by a persistent message queue like RabbitMQ or a database table.
# main.py
import asyncio
import logging
import os
from typing import List
from scraper.browser_manager import BrowserManager
from scraper.company_scraper import CompanyScraper
from database.graph_repository import GraphRepository
# Load configuration from environment variables for production readiness.
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
# The maximum number of concurrent scraping tasks.
# This should be tuned based on system resources (CPU/Memory).
MAX_CONCURRENT_TASKS = 5
class Orchestrator:
def __init__(self, target_urls: List[str]):
self.target_queue = asyncio.Queue()
for url in target_urls:
self.target_queue.put_nowait(url)
self.scraper = CompanyScraper()
self.repository = GraphRepository(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
self.results = []
async def worker(self, name: str):
"""A worker that pulls URLs from the queue and processes them."""
while not self.target_queue.empty():
url = await self.target_queue.get()
logging.info(f"Worker {name} starting task for {url}")
try:
scraped_data = await self.scraper.scrape(url)
await self.repository.add_company_structure(scraped_data)
self.results.append({'url': url, 'status': 'SUCCESS'})
except Exception as e:
logging.error(f"Worker {name} failed task for {url}: {type(e).__name__}")
self.results.append({'url': url, 'status': 'FAILED', 'reason': str(e)})
finally:
self.target_queue.task_done()
async def run(self):
"""Initializes resources and runs the orchestration logic."""
# It's critical to manage resources like browser and DB connections properly.
# They should be initialized once and closed gracefully.
await BrowserManager.get_browser()
workers = [
asyncio.create_task(self.worker(f"worker-{i}"))
for i in range(MAX_CONCURRENT_TASKS)
]
# Wait for the queue to be fully processed
await self.target_queue.join()
# Cancel any lingering worker tasks
for w in workers:
w.cancel()
await asyncio.gather(*workers, return_exceptions=True)
# Graceful shutdown
await BrowserManager.close_browser()
await self.repository.close()
logging.info("Orchestration finished.")
logging.info(f"Results: {self.results}")
async def main():
# In a real system, these URLs would come from a database or API.
targets = [
"http://example-registry.com/company/12345",
"http://example-registry.com/company/67890",
"http://example-registry.com/company/abcde",
# ... more URLs
]
orchestrator = Orchestrator(target_urls=targets)
await orchestrator.run()
if __name__ == "__main__":
# A simple test to demonstrate running the orchestration.
# In a real app, this would be part of a larger service or CLI.
# We would need to replace the example URLs with real, scrapable ones.
# Due to the conceptual nature of the scraper logic, running this directly
# will fail on page.evaluate selectors. The architecture, however, is sound.
# asyncio.run(main())
print("To run this, replace conceptual scraper logic and URLs in `main.py` and uncomment the `asyncio.run` line.")
The orchestrator uses an asyncio.Queue
and a fixed number of worker coroutines. This pattern provides a simple but effective way to control concurrency, preventing the system from overwhelming either the local machine’s resources by launching too many browsers or the target servers with too many requests.
Once populated, the graph unlocks powerful analytical capabilities that were previously impossible. A query to find the ultimate beneficial owners of a company (i.e., the people who control it through chains of other companies) becomes trivial:
// Find all persons who ultimately control 'Target Company LLC'
// through any number of ownership layers.
MATCH (p:Person)-[:DIRECTOR_OF*1..]->(c:Company)-[:OWNS*1..]->(t:Company {name: 'Target Company LLC'})
RETURN DISTINCT p.name AS UltimateBeneficialOwner, c.name as ControlledEntity;
This architecture, combining Puppeteer’s deep interaction capabilities with Python’s asynchronous orchestration and Neo4j’s graph-native storage, provides a scalable and resilient solution for extracting and analyzing complex relational data from the web. The implementation details—browser pooling, idempotent data ingestion with MERGE
and UNWIND
, and explicit state management—are not optional extras; they are fundamental requirements for building a system that can withstand the rigors of a production environment.
The current system’s primary limitation lies in its simplistic state and queue management. An in-memory queue is not persistent; if the process crashes, all in-flight and pending tasks are lost. A future iteration would replace the asyncio.Queue
with a durable system like RabbitMQ or Redis Streams. This would also allow the architecture to scale across multiple machines, with each machine running an orchestrator process that pulls from the same central queue. Furthermore, the scraper logic is still monolithic; a more advanced design would use a strategy pattern, where different scraper implementations could be dynamically selected based on the target URL’s domain, allowing the system to handle dozens of different website layouts without becoming a single, unmaintainable file.