Declarative Management of Snowflake Resources Using a Crossplane and GitHub Actions Control Plane


The request came through as it always does: a new microservice needs its own sandboxed database in Snowflake for analytics. The process was a familiar, painful ritual involving Jira tickets, manual approvals, and a DBA running a script, which took anywhere from two hours to two days. This friction was a significant drag on developer velocity, and the lack of a clear audit trail for who requested what, and which permissions were granted, was a looming compliance nightmare. We needed a fully declarative, GitOps-driven system where a developer could define their database needs in a YAML file, commit it to Git, and have a fully configured Snowflake environment with credentials provisioned automatically.

Our platform is built on Kubernetes, and our entire ethos revolves around declarative configuration managed via Git. The natural entry point for any new piece of infrastructure is the Kubernetes API. This led us directly to Crossplane. It allows us to extend the Kubernetes API to manage external resources, presenting them as native-seeming objects. Instead of learning a new tool, our developers could simply define a SnowflakeDatabase object just like they would a Deployment or Service.

However, a standard Crossplane provider for Snowflake, even if one existed that met our needs, wouldn’t be enough. We had specific business logic to enforce: naming conventions, default role grants, user creation tied to the resource, and, most importantly, the secure delivery of credentials back into the requesting team’s Kubernetes namespace. A simple 1:1 mapping of a CRD to a Snowflake resource was insufficient. This required a custom control plane—a reconciliation loop that would enact our specific business logic.

Building a full-blown Kubernetes operator from scratch in Go felt like overkill for this initial implementation. The learning curve is steep, and the maintenance overhead is significant. Our CI/CD and automation are standardized on GitHub Actions. The question then became: could we implement the reconciliation logic for our Crossplane resources within a GitHub Actions workflow? This approach, while unconventional, would allow us to leverage our existing tooling and expertise, moving faster while accepting certain trade-offs. The Snowflake SQL API, a RESTful interface for executing SQL queries, would be the final piece, allowing us to interact with Snowflake from any environment that can make an HTTP request, a perfect fit for a GitHub Actions runner.

The architecture crystallized:

  1. Declarative API: A Crossplane CompositeResourceDefinition (XRD) defines the schema for a CompositeSnowflakeDatabase. This is the user-facing contract.
  2. GitOps Engine: ArgoCD (or Flux) syncs the user’s YAML manifest from a Git repository to the Kubernetes cluster, creating an instance of our custom resource.
  3. Reconciliation Loop: A scheduled GitHub Actions workflow periodically queries the Kubernetes cluster for these custom resources.
  4. Execution Logic: The workflow executes a script that calls the Snowflake SQL API to create databases, users, and roles, and then creates a Kubernetes Secret with the new credentials in the appropriate namespace.

Phase 1: Defining the Declarative API with Crossplane

Everything starts with the contract. We need to define what a developer can and must specify. We settled on a CompositeResourceDefinition (XRD) that exposed a minimal, user-friendly set of parameters while hiding the underlying complexity.

In a real-world project, you avoid giving users too many knobs to turn. We enforce naming conventions and ownership at the platform level.

Here is the complete XRD for compositesnowflakedatabases.platform.acme.com:

apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: compositesnowflakedatabases.platform.acme.com
spec:
  group: platform.acme.com
  names:
    kind: CompositeSnowflakeDatabase
    plural: compositesnowflakedatabases
  claimNames:
    kind: SnowflakeDatabase
    plural: snowflakedatabases
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              parameters:
                type: object
                properties:
                  # The application or service name. The final DB name will be derived from this.
                  # Example: 'user-service' -> 'USER_SERVICE_DEV_DB'
                  serviceName:
                    type: string
                    description: "The name of the service that will own this database."
                  
                  # The target namespace where credentials will be written.
                  # This ensures credentials are only accessible by the intended application.
                  writeConnectionSecretToRef:
                    type: object
                    properties:
                      namespace:
                        type: string
                        description: "The namespace to which the connection secret will be written."
                    required:
                      - namespace

                  # Optional data retention period, defaults to 1 day for non-prod.
                  dataRetentionTimeInDays:
                    type: integer
                    description: "Specifies the number of days for which Time Travel actions can be performed on the database."
                    default: 1
                required:
                  - serviceName
                  - writeConnectionSecretToRef
            required:
              - parameters
          status:
            type: object
            properties:
              databaseName:
                type: string
                description: "The fully-qualified name of the provisioned Snowflake database."
              userName:
                type: string
                description: "The service user created for this database."
              conditions:
                type: array
                items:
                  type: object
                  properties:
                    type:
                      type: string
                    status:
                      type: string
                    lastTransitionTime:
                      type: string
                    reason:
                      type: string
                    message:
                      type: string

The key fields here are serviceName and writeConnectionSecretToRef. We derive the database name, role name, and username from serviceName to enforce a consistent naming scheme. writeConnectionSecretToRef is critical for security; it dictates where the generated credentials will be placed, ensuring they land in the application’s namespace.

Phase 2: The Composition and the “NoOp” Resource

With the API defined, we needed a Composition. In a typical Crossplane setup, the Composition would map our abstract CompositeSnowflakeDatabase to concrete resources managed by a provider, like an RDSInstance from provider-aws. We don’t have a provider that performs our custom logic.

The pitfall here is trying to force a pre-existing provider to do something it wasn’t designed for. Instead, we use a “NoOp” or placeholder resource. We use the official provider-kubernetes to create a simple Object resource that essentially just represents the claim in the cluster. Its existence is the signal our external reconciler will look for. It does nothing on its own, but it completes the Crossplane composition flow, allowing the claim to reach a Ready state once the placeholder Object is created.

apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: snowflake.database.platform.acme.com
  labels:
    provider: internal
spec:
  compositeTypeRef:
    apiVersion: platform.acme.com/v1alpha1
    kind: CompositeSnowflakeDatabase
  # This composition uses the kubernetes provider to create a placeholder object.
  # The actual logic is handled by an external GitHub Actions workflow.
  resources:
    - name: placeholder
      base:
        apiVersion: kubernetes.crossplane.io/v1alpha1
        kind: Object
        spec:
          forProvider:
            manifest:
              apiVersion: v1
              kind: ConfigMap
              metadata:
                name: "snowflake-claim-placeholder"
          providerConfigRef:
            name: kubernetes-in-cluster-config # Assumes an in-cluster provider config
      patches:
        - fromFieldPath: "metadata.uid"
          toFieldPath: "spec.forProvider.manifest.metadata.name"
          transforms:
            - type: string
              string:
                fmt: "snowflake-db-claim-%s"
        - fromFieldPath: "spec.claimRef.namespace"
          toFieldPath: "spec.forProvider.manifest.metadata.namespace"

This Composition is deceptively simple. It creates a ConfigMap whose name is derived from the claim’s UID. This ConfigMap serves no purpose other than to exist, signaling to Crossplane that the resource has been “provisioned.” The real provisioning happens next.

Phase 3: The Reconciliation Loop in GitHub Actions

This is the heart of our custom control plane. A scheduled workflow acts as our operator’s reconciliation loop. It’s less efficient than a true event-driven operator but vastly simpler to implement and maintain.

The workflow file, .github/workflows/snowflake-reconciler.yml:

name: Snowflake Resource Reconciler

on:
  workflow_dispatch: # For manual runs
  schedule:
    # Run every 15 minutes. A compromise between responsiveness and cost.
    - cron: '*/15 * * * *'

jobs:
  reconcile:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write # Required for cloud authentication

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: 'Setup Python'
        uses: actions/setup-python@v5
        with:
          python-version: '3.10'

      - name: 'Install dependencies'
        run: |
          python -m pip install --upgrade pip
          pip install kubernetes pyjwt cryptography snowflake-connector-python pandas

      # In a real-world project, authenticate to your cloud provider
      # to get temporary credentials for kubectl.
      # This example assumes kubectl is configured manually via secrets for simplicity.
      - name: Configure Kubectl
        run: |
          mkdir -p $HOME/.kube
          echo "${{ secrets.KUBE_CONFIG_DATA }}" | base64 -d > $HOME/.kube/config
          
      - name: Run Snowflake Reconciliation Script
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_RECONCILER_USER }}
          SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_RECONCILER_PK }}
          RECONCILER_ROLE: ${{ secrets.SNOWFLAKE_RECONCILER_ROLE }}
        run: python ./scripts/reconcile_snowflake.py

The workflow is straightforward: check out the code, install Python dependencies, configure kubectl, and run the reconciliation script. The critical part is security. We are using Snowflake’s Key Pair Authentication. The private key for the service account is stored as a GitHub Encrypted Secret. The public key is assigned to the service account user in Snowflake. This is far more secure than using a password.

Phase 4: The Core Logic - The Python Reconciler

The Python script (scripts/reconcile_snowflake.py) contains the logic that would normally reside inside an operator’s controller.

First, the setup and connection logic. We use JWT for authentication with the Snowflake SQL API. This avoids storing long-lived credentials.

# scripts/reconcile_snowflake.py

import os
import json
import uuid
import time
import base64
from datetime import timedelta, timezone, datetime

import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend

from kubernetes import client, config

# --- Configuration ---
SNOWFLAKE_ACCOUNT = os.environ['SNOWFLAKE_ACCOUNT']
SNOWFLAKE_USER = os.environ['SNOWFLAKE_USER']
SNOWFLAKE_PRIVATE_KEY_STR = os.environ['SNOWFLAKE_PRIVATE_KEY']
RECONCILER_ROLE = os.environ['RECONCILER_ROLE']

# Constants
API_VERSION = "platform.acme.com/v1alpha1"
KIND_PLURAL = "compositesnowflakedatabases"
MANAGED_BY_LABEL = "platform.acme.com/managed-by"
FINALIZER_NAME = "platform.acme.com/snowflake-finalizer"
ENV = "DEV" # Should be configurable

def generate_snowflake_jwt():
    """Generates a JWT for Snowflake Key Pair Authentication."""
    
    # In a production system, avoid exposing the raw private key string directly.
    # Use a secure secret management system.
    private_key = serialization.load_pem_private_key(
        SNOWFLAKE_PRIVATE_KEY_STR.encode(),
        password=None, # Assuming the key is not encrypted
        backend=default_backend()
    )

    # The public key fingerprint must be pre-configured in Snowflake for the user.
    # Command: DESC USER <user_name>;
    # Look for RSA_PUBLIC_KEY_FP
    public_key_fp = "SHA256:..." # You must get this from your Snowflake account
    
    account_identifier = SNOWFLAKE_ACCOUNT.split('.')[0]
    qualified_username = f"{account_identifier}.{SNOWFLAKE_USER}"
    
    now = datetime.now(timezone.utc)
    
    payload = {
        'iss': f"{qualified_username}.{public_key_fp}",
        'sub': qualified_username,
        'iat': now,
        'exp': now + timedelta(minutes=59) # Expiration time is 1 hour
    }

    return jwt.encode(payload, private_key, algorithm='RS256')

# ... (requests session setup would go here)

The core reconciliation logic fetches all CompositeSnowflakeDatabase resources and compares this “desired state” with the actual state in Snowflake.

# Part of scripts/reconcile_snowflake.py

class SnowflakeAPIClient:
    # ... (Implementation for posting queries to Snowflake SQL API)
    # This class would handle making HTTP requests, error handling, etc.
    # It would use the generated JWT in the Authorization header.
    def execute_sql(self, sql_text, role):
        # Placeholder for actual API call logic
        print(f"Executing SQL with role {role}: {sql_text}")
        # In a real implementation, this would return results or raise an exception.
        return {"status": "success"}

def reconcile_deletion(k8s_api, k8s_custom_obj_api, obj, sf_client):
    """Handles the deletion of a Snowflake database when the CR is deleted."""
    metadata = obj.get("metadata", {})
    spec_params = obj.get("spec", {}).get("parameters", {})
    service_name = spec_params.get("serviceName", "").upper()
    db_name = f"{service_name}_{ENV}_DB"
    role_name = f"{service_name}_{ENV}_ROLE"
    user_name = f"{service_name}_{ENV}_USER"

    print(f"De-provisioning resources for {db_name}...")
    
    # Idempotent DROP statements
    sf_client.execute_sql(f"DROP DATABASE IF EXISTS {db_name};", RECONCILER_ROLE)
    sf_client.execute_sql(f"DROP ROLE IF EXISTS {role_name};", "ACCOUNTADMIN")
    sf_client.execute_sql(f"DROP USER IF EXISTS {user_name};", "ACCOUNTADMIN")

    # Remove the finalizer to allow Kubernetes to delete the object
    metadata.get("finalizers", []).remove(FINALIZER_NAME)
    k8s_custom_obj_api.patch_cluster_custom_object(
        group="platform.acme.com",
        version="v1alpha1",
        plural=KIND_PLURAL,
        name=metadata["name"],
        body={"metadata": {"finalizers": metadata.get("finalizers")}},
    )
    print(f"Finalizer removed. Deletion of {db_name} complete.")


def reconcile_creation_or_update(k8s_api, k8s_custom_obj_api, obj, sf_client):
    """Handles creation and state reconciliation for a Snowflake database."""
    metadata = obj.get("metadata", {})
    spec_params = obj.get("spec", {}).get("parameters", {})
    
    # --- Derive names from spec ---
    service_name = spec_params.get("serviceName", "").upper()
    db_name = f"{service_name}_{ENV}_DB"
    role_name = f"{service_name}_{ENV}_ROLE"
    user_name = f"{service_name}_{ENV}_USER"
    retention_days = spec_params.get("dataRetentionTimeInDays", 1)
    secret_ns = spec_params["writeConnectionSecretToRef"]["namespace"]
    secret_name = f"{spec_params['serviceName'].lower()}-snowflake-credentials"

    print(f"Reconciling database: {db_name}")

    # --- 1. Create resources in Snowflake (idempotently) ---
    sf_client.execute_sql(
        f"CREATE DATABASE IF NOT EXISTS {db_name} DATA_RETENTION_TIME_IN_DAYS = {retention_days};",
        RECONCILER_ROLE
    )
    sf_client.execute_sql(f"CREATE ROLE IF NOT EXISTS {role_name};", "USERADMIN")
    
    # Check if user exists. If so, we can't get the password.
    # In a real system, you'd rotate the password or handle this case.
    # For simplicity, we assume if the user exists, the secret also exists.
    # A common mistake is not handling this, leading to orphaned secrets.
    
    # --- 2. Check if Kubernetes secret already exists ---
    try:
        k8s_api.read_namespaced_secret(name=secret_name, namespace=secret_ns)
        print(f"Secret '{secret_name}' already exists in namespace '{secret_ns}'. Skipping user creation.")
        user_password = None # Indicate we don't need to create a new secret
    except client.ApiException as e:
        if e.status == 404:
            print("Secret not found. Proceeding with user and secret creation.")
            user_password = str(uuid.uuid4()) # Generate a secure random password
            
            sf_client.execute_sql(f"""
                CREATE USER IF NOT EXISTS {user_name}
                PASSWORD = '{user_password}'
                DEFAULT_ROLE = {role_name}
                DEFAULT_WAREHOUSE = 'COMPUTE_WH';
            """, "USERADMIN")
        else:
            raise

    # --- 3. Grant permissions ---
    sf_client.execute_sql(f"GRANT ROLE {role_name} TO USER {user_name};", "USERADMIN")
    sf_client.execute_sql(f"GRANT USAGE ON DATABASE {db_name} TO ROLE {role_name};", RECONCILER_ROLE)
    sf_client.execute_sql(f"GRANT ALL PRIVILEGES ON SCHEMA PUBLIC IN DATABASE {db_name} TO ROLE {role_name};", RECONCILER_ROLE)
    sf_client.execute_sql(f"GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE {role_name};", RECONCILER_ROLE)

    # --- 4. Create or update the K8s Secret ---
    if user_password:
        secret_data = {
            "SNOWFLAKE_USER": base64.b64encode(user_name.encode()).decode(),
            "SNOWFLAKE_PASSWORD": base64.b64encode(user_password.encode()).decode(),
            "SNOWFLAKE_ACCOUNT": base64.b64encode(SNOWFLAKE_ACCOUNT.encode()).decode(),
            "SNOWFLAKE_DATABASE": base64.b64encode(db_name.encode()).decode(),
            "SNOWFLAKE_ROLE": base64.b64encode(role_name.encode()).decode(),
        }
        secret_body = {
            "apiVersion": "v1",
            "kind": "Secret",
            "metadata": {
                "name": secret_name,
                "namespace": secret_ns,
                "labels": {MANAGED_BY_LABEL: "snowflake-reconciler"},
            },
            "data": secret_data,
        }
        # Use patch to create or update to be idempotent
        k8s_api.patch_namespaced_secret(name=secret_name, namespace=secret_ns, body=secret_body)
        print(f"Secret '{secret_name}' created/updated in namespace '{secret_ns}'.")

    # --- 5. Update the status of the CRD ---
    status_patch = {
        "status": {
            "databaseName": db_name,
            "userName": user_name,
            # Add conditions to reflect the status
        }
    }
    k8s_custom_obj_api.patch_cluster_custom_object_status(
        group="platform.acme.com",
        version="v1alpha1",
        plural=KIND_PLURAL,
        name=metadata["name"],
        body=status_patch,
    )


def main():
    """Main reconciliation loop."""
    config.load_kube_config()
    k8s_api = client.CoreV1Api()
    k8s_custom_obj_api = client.CustomObjectsApi()
    sf_client = SnowflakeAPIClient() # This would be properly initialized

    print("Starting Snowflake reconciliation cycle...")
    
    # Get all custom resources
    try:
        cr_list = k8s_custom_obj_api.list_cluster_custom_object(
            group="platform.acme.com",
            version="v1alpha1",
            plural=KIND_PLURAL
        )["items"]
    except client.ApiException as e:
        print(f"Error fetching custom resources: {e}")
        return

    for obj in cr_list:
        metadata = obj.get("metadata", {})
        is_being_deleted = "deletionTimestamp" in metadata

        if is_being_deleted:
            if FINALIZER_NAME in metadata.get("finalizers", []):
                reconcile_deletion(k8s_api, k8s_custom_obj_api, obj, sf_client)
        else:
            # Add finalizer if it doesn't exist
            if FINALIZER_NAME not in metadata.get("finalizers", []):
                finalizers = metadata.get("finalizers", []) or []
                finalizers.append(FINALIZER_NAME)
                k8s_custom_obj_api.patch_cluster_custom_object(
                    group="platform.acme.com",
                    version="v1alpha1",
                    plural=KIND_PLURAL,
                    name=metadata["name"],
                    body={"metadata": {"finalizers": finalizers}},
                )
                print(f"Added finalizer to {metadata['name']}")
            
            reconcile_creation_or_update(k8s_api, k8s_custom_obj_api, obj, sf_client)
    
    print("Reconciliation cycle complete.")

if __name__ == "__main__":
    main()

A critical piece of the deletion logic is the use of a finalizer. When a user deletes the SnowflakeDatabase CR, Kubernetes doesn’t immediately remove it. Instead, it adds a deletionTimestamp to the metadata. Our script detects this and runs the de-provisioning logic (dropping the user, role, and database). Only after that logic succeeds do we remove our custom finalizer from the object. Once the finalizers list is empty, Kubernetes garbage collects the object. Without this, deleting the CR would leave orphaned resources in Snowflake forever.

The Complete Flow in Practice

Here’s the end-to-end user experience, which is now the gold standard for infrastructure provisioning in our organization.

sequenceDiagram
    participant Dev as Developer
    participant Git
    participant ArgoCD
    participant K8sAPI as Kubernetes API Server
    participant GHA as GitHub Actions Reconciler
    participant Snowflake

    Dev->>Git: Commits SnowflakeDatabase.yaml
    Git->>ArgoCD: Webhook triggers sync
    ArgoCD->>K8sAPI: Applies SnowflakeDatabase CRD
    K8sAPI-->>ArgoCD: CRD object created
    
    loop Every 15 minutes
        GHA->>K8sAPI: kubectl get compositesnowflakedatabases
        K8sAPI-->>GHA: Returns list of CRD objects
    end

    GHA->>GHA: Detects new CRD instance
    GHA->>Snowflake: (via SQL API) CREATE DATABASE, ROLE, USER
    Snowflake-->>GHA: Success
    GHA->>K8sAPI: kubectl apply -f secret.yaml
    K8sAPI-->>GHA: Secret created in app namespace
    GHA->>K8sAPI: PATCH status on CRD object
    K8sAPI-->>GHA: Status updated

A developer writes a simple claim file:

apiVersion: platform.acme.com/v1alpha1
kind: SnowflakeDatabase
metadata:
  name: user-service-db
  namespace: user-service-prod
spec:
  parameters:
    serviceName: user-service
    dataRetentionTimeInDays: 7
    writeConnectionSecretToRef:
      namespace: user-service-prod

They commit this to their application’s infrastructure repository. ArgoCD picks it up and applies it. Within 15 minutes, a Kubernetes secret named user-service-snowflake-credentials appears in the user-service-prod namespace, ready to be mounted into their application pods. Deleting the YAML file from Git triggers the reverse process, cleaning up all associated Snowflake resources.

This system, while using an unconventional reconciler, delivered exactly what we needed: a fully auditable, GitOps-driven, self-service platform for a critical data resource. The developer experience is vastly improved, and the platform team is no longer a bottleneck.

The current implementation is a solid v1, but it’s not without its limitations. The 15-minute polling interval for the GitHub Actions workflow means developers might wait that long for their database. A truly event-driven architecture, likely requiring a lightweight Kubernetes controller to watch for CRD changes and trigger the workflow via a workflow_dispatch event, would provide near-instantaneous provisioning. Furthermore, the reconciliation script itself could be more robust. In a high-volume production environment, it would be better packaged as a container and run as a Kubernetes Job, providing better logging, resource management, and failure isolation than a script running directly on a GitHub Actions runner. Finally, managing state by comparing desired K8s state with actual Snowflake state works, but it’s brittle. A more mature version would write metadata back to a dedicated control table within Snowflake itself, making the reconciliation process more robust and its state easier to debug.


  TOC