🏠🛡️ We Have CSPM at Home: Using DuckDB and SQL to Discover Your Lab's Cloud Security Posture

May 5, 2025 12 min

📚 DuckDB for Cloud Security

This series explores how to leverage DuckDB for cloud security analysis, taking you from basic concepts to advanced security monitoring techniques. Learn to build your own CSMP solutions, analyze security tool outputs, and create powerful DIY security workflows.

Introduction

If you’ve spent any time in cloud security, you’re familiar with the term CSPM (Cloud Security Posture Management). These commercial tools promise to monitor your cloud environments, detect misconfigurations, and keep you safe from the ever-growing list of cloud security threats. They’re also expensive—often costing thousands of dollars per month, which puts them out of reach for smaller teams, educational environments, or labs.

But what if I told you that with a bit of SQL knowledge and some free tools, you could build much of this capability yourself? In this post, I’ll show you how to use DuckDB and SQL to create your own “CSPM at home” solution that can:

  1. Collect resource configurations from AWS, Azure, and GCP
  2. Analyze them for security issues
  3. Track changes over time
  4. Generate reports on your security posture

This approach won’t replace all the features of a commercial CSPM, but it can provide remarkable value at a fraction of the cost—or completely free for small environments.

Prerequisites

Before we begin, you’ll need:

  • Basic knowledge of SQL
  • DuckDB installed (brew install duckdb or download from duckdb.org)
  • Cloud provider CLI tools installed and configured:
    • AWS CLI (aws)
    • Azure CLI (az)
    • Google Cloud CLI (gcloud)
  • Python 3.8+ (for the Python examples)
  • Go 1.20+ (for the Go examples)

If you’re following along with our honeypot lab setup from the previous post, you already have the perfect environment to experiment with these techniques.

Part 1: Collecting Cloud Resources with CLI Tools

Let’s start with the simplest approach: using the built-in CLI tools to extract resource configurations and pipe them directly into DuckDB.

AWS Resources

# Collect EC2 instance information
aws ec2 describe-instances > aws_instances.json

# Collect security group information
aws ec2 describe-security-groups > aws_security_groups.json

# Collect VPC information
aws ec2 describe-vpcs > aws_vpcs.json

# Start DuckDB and load the data
duckdb cspm.db

Inside DuckDB, we can create tables from our JSON files:

-- Create a table for EC2 instances
CREATE TABLE aws_instances AS
SELECT 
  r.id as instance_id,
  r.instanceType as instance_type,
  r.privateIpAddress as private_ip,
  r.publicIpAddress as public_ip,
  r.state->>'Name' as state,
  r.vpcId as vpc_id,
  r.subnetId as subnet_id,
  r.keyName as key_name,
  r.iamInstanceProfile->>'arn' as instance_profile
FROM read_json_auto('aws_instances.json')
  CROSS JOIN UNNEST(Reservations) as res
  CROSS JOIN UNNEST(res.Instances) as r;

-- Create a table for security groups
CREATE TABLE aws_security_groups AS
SELECT 
  sg.groupId as group_id,
  sg.groupName as group_name,
  sg.description,
  sg.vpcId as vpc_id,
  ip.ipProtocol as protocol,
  ip.fromPort,
  ip.toPort,
  ip_range->>'cidrIp' as cidr
FROM read_json_auto('aws_security_groups.json')
  CROSS JOIN UNNEST(SecurityGroups) as sg
  CROSS JOIN UNNEST(sg.ipPermissions) as ip
  CROSS JOIN UNNEST(ip.ipRanges) as ip_range;

Azure Resources

# Collect virtual machine information
az vm list --show-details > azure_vms.json

# Collect network security group information
az network nsg list > azure_nsgs.json

# Collect network security group rules
az network nsg rule list --include-default > azure_nsg_rules.json

# Start DuckDB (or continue from above)
duckdb cspm.db

Inside DuckDB:

-- Create a table for Azure VMs
CREATE TABLE azure_vms AS
SELECT
  id,
  name,
  resourceGroup as resource_group,
  location,
  hardwareProfile->>'vmSize' as vm_size,
  properties->>'vmId' as vm_id,
  properties->>'provisioningState' as state,
  networkProfile->'networkInterfaces'->0->>'id' as primary_nic,
  storageProfile->'osDisk'->>'osType' as os_type,
  tags
FROM read_json_auto('azure_vms.json');

-- Create a table for NSG rules
CREATE TABLE azure_nsg_rules AS
SELECT
  n.id as nsg_id,
  n.name as nsg_name,
  n.resourceGroup as resource_group,
  r.name as rule_name,
  r.properties->>'protocol' as protocol,
  r.properties->>'sourceAddressPrefix' as source_prefix,
  r.properties->>'destinationAddressPrefix' as destination_prefix,
  r.properties->>'sourcePortRange' as source_port,
  r.properties->>'destinationPortRange' as destination_port,
  r.properties->>'access' as access,
  r.properties->>'direction' as direction,
  r.properties->>'priority' as priority
FROM read_json_auto('azure_nsgs.json') AS n
CROSS JOIN LATERAL 
  read_json_auto('azure_nsg_rules.json') AS r
WHERE STARTS_WITH(r.properties->>'networkSecurityGroup'->>'id', n.id);

GCP Resources

# Collect compute instances
gcloud compute instances list --format=json > gcp_instances.json

# Collect firewall rules
gcloud compute firewall-rules list --format=json > gcp_firewall_rules.json

# Collect VPC networks
gcloud compute networks list --format=json > gcp_networks.json

# Start DuckDB (or continue from above)
duckdb cspm.db

Inside DuckDB:

-- Create a table for GCP instances
CREATE TABLE gcp_instances AS
SELECT
  id,
  name,
  machineType,
  zone,
  status,
  networkInterfaces->0->>'networkIP' as internal_ip,
  networkInterfaces->0->'accessConfigs'->0->>'natIP' as external_ip,
  serviceAccounts->0->>'email' as service_account,
  labels,
  creationTimestamp
FROM read_json_auto('gcp_instances.json');

-- Create a table for firewall rules
CREATE TABLE gcp_firewall_rules AS
SELECT
  id,
  name,
  network,
  direction,
  priority,
  sourceRanges,
  allowed
FROM read_json_auto('gcp_firewall_rules.json');

Finding Security Issues with SQL Queries

Now that we have our cloud resource data in DuckDB tables, we can write SQL queries to find security issues. Here are some examples:

AWS: Public EC2 Instances with Admin IAM Roles

SELECT 
  i.instance_id,
  i.instance_type,
  i.public_ip,
  i.instance_profile
FROM 
  aws_instances i
WHERE 
  i.public_ip IS NOT NULL
  AND i.instance_profile LIKE '%admin%';

AWS: Security Groups with Unrestricted Access

SELECT 
  group_id,
  group_name,
  protocol,
  fromPort,
  toPort,
  cidr
FROM 
  aws_security_groups
WHERE 
  cidr = '0.0.0.0/0'
  AND (
    protocol = '-1'  -- All traffic
    OR (protocol = 'tcp' AND (fromPort = 22 OR fromPort = 3389))  -- SSH or RDP
  );

Azure: VMs with Public IPs

SELECT
  id,
  name,
  resource_group,
  vm_size
FROM
  azure_vms
WHERE
  JSON_EXTRACT_STRING(
    networkProfile->'networkInterfaces'->0->'properties'->'ipConfigurations'->0->'properties',
    '$.publicIPAddress'
  ) IS NOT NULL;

Azure: NSG Rules Allowing Broad Access

SELECT
  nsg_name,
  rule_name,
  protocol,
  source_prefix,
  destination_port,
  access,
  direction,
  priority
FROM
  azure_nsg_rules
WHERE
  source_prefix = '*' OR source_prefix = '0.0.0.0/0' OR source_prefix = 'Internet'
  AND access = 'Allow'
  AND direction = 'Inbound'
  AND (
    destination_port = '*' OR 
    destination_port = '22' OR 
    destination_port = '3389'
  );

GCP: Instances with Public IPs

SELECT
  name,
  machineType,
  zone,
  external_ip,
  service_account
FROM
  gcp_instances
WHERE
  external_ip IS NOT NULL;

GCP: Overly Permissive Firewall Rules

SELECT
  name,
  network,
  direction,
  priority,
  JSON_EXTRACT_STRING(allowed->0, '$.IPProtocol') as protocol,
  JSON_EXTRACT_STRING(allowed->0, '$.ports') as ports,
  JSON_EXTRACT_STRING(sourceRanges->0, '$') as source_range
FROM
  gcp_firewall_rules
WHERE
  JSON_EXTRACT_STRING(sourceRanges->0, '$') = '0.0.0.0/0'
  AND direction = 'INGRESS'
  AND (
    JSON_EXTRACT_STRING(allowed->0, '$.IPProtocol') = 'all' OR
    JSON_EXTRACT_STRING(allowed->0, '$.ports') LIKE '%22%' OR
    JSON_EXTRACT_STRING(allowed->0, '$.ports') LIKE '%3389%'
  );

Cross-Cloud Query: Count of Vulnerable Resources

-- Count of public-facing resources across clouds
SELECT
  'AWS' as cloud,
  COUNT(*) as public_instances
FROM
  aws_instances
WHERE
  public_ip IS NOT NULL

UNION ALL

SELECT
  'Azure' as cloud,
  COUNT(*) as public_instances
FROM
  azure_vms
WHERE
  JSON_EXTRACT_STRING(
    networkProfile->'networkInterfaces'->0->'properties'->'ipConfigurations'->0->'properties',
    '$.publicIPAddress'
  ) IS NOT NULL

UNION ALL

SELECT
  'GCP' as cloud,
  COUNT(*) as public_instances
FROM
  gcp_instances
WHERE
  external_ip IS NOT NULL;

Part 2: Scripting with Python

While CLI commands work for ad-hoc analysis, Python scripts provide more flexibility and can handle pagination, error handling, and more complex data transformations. Here’s an example Python script to collect and analyze resources from all three cloud providers:

import json
import subprocess
import duckdb
import pandas as pd
import boto3
import azure.mgmt.compute
from azure.identity import DefaultAzureCredential
from google.cloud import compute_v1

# Connect to DuckDB
conn = duckdb.connect('cspm.db')

# AWS Collection
def collect_aws():
    ec2 = boto3.client('ec2')
    
    # Collect EC2 instances
    instances = []
    paginator = ec2.get_paginator('describe_instances')
    for page in paginator.paginate():
        for reservation in page['Reservations']:
            for instance in reservation['Instances']:
                instances.append({
                    'instance_id': instance.get('InstanceId'),
                    'instance_type': instance.get('InstanceType'),
                    'private_ip': instance.get('PrivateIpAddress'),
                    'public_ip': instance.get('PublicIpAddress'),
                    'state': instance.get('State', {}).get('Name'),
                    'vpc_id': instance.get('VpcId'),
                    'subnet_id': instance.get('SubnetId'),
                    'key_name': instance.get('KeyName'),
                    'iam_profile': instance.get('IamInstanceProfile', {}).get('Arn')
                })
    
    # Create DataFrame and load into DuckDB
    df = pd.DataFrame(instances)
    conn.execute('DROP TABLE IF EXISTS aws_instances')
    conn.execute('CREATE TABLE aws_instances AS SELECT * FROM df')
    
    # Similar process for security groups, VPCs, etc.
    # ...

# Azure Collection
def collect_azure():
    credential = DefaultAzureCredential()
    compute_client = azure.mgmt.compute.ComputeManagementClient(
        credential=credential,
        subscription_id='your-subscription-id'
    )
    
    # Collect VMs
    vms = []
    for vm in compute_client.virtual_machines.list_all():
        vm_dict = vm.as_dict()
        vms.append({
            'id': vm_dict.get('id'),
            'name': vm_dict.get('name'),
            'resource_group': vm_dict.get('id').split('/')[4],
            'location': vm_dict.get('location'),
            'vm_size': vm_dict.get('hardware_profile', {}).get('vm_size'),
            'os_type': vm_dict.get('storage_profile', {}).get('os_disk', {}).get('os_type')
            # Add more fields as needed
        })
    
    # Create DataFrame and load into DuckDB
    df = pd.DataFrame(vms)
    conn.execute('DROP TABLE IF EXISTS azure_vms')
    conn.execute('CREATE TABLE azure_vms AS SELECT * FROM df')
    
    # Similar process for NSGs, vnets, etc.
    # ...

# GCP Collection
def collect_gcp():
    instance_client = compute_v1.InstancesClient()
    project_id = 'your-project-id'
    
    # Collect instances (simplified example)
    instances = []
    for zone in ['us-central1-a', 'us-central1-b']:  # Add your zones
        request = compute_v1.ListInstancesRequest(
            project=project_id,
            zone=zone
        )
        for instance in instance_client.list(request=request):
            instances.append({
                'id': instance.id,
                'name': instance.name,
                'machine_type': instance.machine_type.split('/')[-1],
                'zone': zone,
                'status': instance.status,
                'internal_ip': instance.network_interfaces[0].network_ip if instance.network_interfaces else None,
                'external_ip': instance.network_interfaces[0].access_configs[0].nat_ip 
                               if instance.network_interfaces and instance.network_interfaces[0].access_configs 
                               else None
            })
    
    # Create DataFrame and load into DuckDB
    df = pd.DataFrame(instances)
    conn.execute('DROP TABLE IF EXISTS gcp_instances')
    conn.execute('CREATE TABLE gcp_instances AS SELECT * FROM df')
    
    # Similar process for firewalls, networks, etc.
    # ...

# Run collection for all providers
collect_aws()
collect_azure()
collect_gcp()

# Run security queries
print("Instances with public IPs across clouds:")
result = conn.execute("""
    SELECT
      'AWS' as cloud,
      COUNT(*) as public_instances
    FROM
      aws_instances
    WHERE
      public_ip IS NOT NULL

    UNION ALL

    SELECT
      'Azure' as cloud,
      COUNT(*) as public_instances
    FROM
      azure_vms
    WHERE
      external_ip IS NOT NULL

    UNION ALL

    SELECT
      'GCP' as cloud,
      COUNT(*) as public_instances
    FROM
      gcp_instances
    WHERE
      external_ip IS NOT NULL;
""").fetchall()

for row in result:
    print(f"{row[0]}: {row[1]} public instances")

# Close connection
conn.close()

This Python script:

  1. Uses the native SDK for each cloud provider
  2. Handles pagination properly
  3. Transforms the data into a consistent format
  4. Loads everything into DuckDB
  5. Runs security queries against the collected data

You could extend this script to:

  • Schedule regular scans
  • Compare results over time to detect drift
  • Send alerts when new security issues are detected
  • Generate reports for compliance purposes

Part 3: Building a Scalable Solution with Go

[This section will detail our Go-based solution for more scalable CSPM scanning across cloud providers.]

Comparing with Commercial CSPM Solutions

While our DIY solution provides tremendous value, it’s worth understanding how it compares to commercial CSPM tools:

FeatureDIY CSPMCommercial CSPM
CostFree to very low$$$$ (often per-resource pricing)
Setup complexityMediumLow
Maintenance effortMediumLow
CustomizationVery highLimited
Coverage breadthLimited to what you buildExtensive
Detection depthHigh (custom queries)Medium (predefined rules)
Update frequencyManualAutomatic
Integration with workflowsCustom developmentBuilt-in
Compliance reportingManualAutomated

The DIY approach shines when:

  • Budget is limited
  • You need custom detection logic
  • You want full control over your security data
  • You’re focused on specific cloud resources
  • You’re using this as a learning experience

Best Practices for DIY CSPM

Based on experience building and maintaining DIY CSPM solutions, here are some best practices:

  1. Version your queries: Store them in a git repo so you can track changes over time
  2. Create tables for benchmark results: Save the results of each scan to track improvements
  3. Add resource ownership and tagging: Ensure you can track who owns each resource
  4. Implement change detection: Alert on new public resources or security group changes
  5. Automate remediation where possible: For critical issues, write scripts to fix them immediately
  6. Establish escalation paths: Know who to contact when you find serious issues
  7. Document exceptions: Track authorized policy exceptions so you don’t repeatedly alert on them

Conclusion

Commercial CSPM tools have their place, but you don’t always need to spend thousands of dollars to get meaningful cloud security insights. With DuckDB, SQL, and a bit of scripting, you can build a surprisingly powerful “CSPM at home” solution that focuses on the issues most relevant to your organization, detect them reliably, and remediate them promptly.

This approach not only saves money but also gives you deeper insight into your cloud infrastructure and builds valuable skills in cloud security and data analysis. As your environment grows, you can decide whether to continue scaling your DIY solution or transition to a commercial offering.

Remember: the goal isn’t to replicate every feature of a commercial CSPM, but to focus on the security issues that matter most to your organization, detect them reliably, and remediate them promptly.

What DIY security tools have you built? Share your experiences in the comments!


The scripts and queries in this post are provided as examples and should be tested and adapted to your specific environment before use in production.

~jared gore