Advanced Usage Guide

Advanced topics, integrations, and best practices for power users and production deployments.

Table of Contents


Custom Policy Creation

Overview

While KafkaGuard provides three built-in policy tiers (baseline-dev, enterprise-default, and finance-iso in Phase 2), you can create custom policies tailored to your organization's specific requirements.

When to Create Custom Policies

Create custom policies when you need to:

  • ✅ Enforce organization-specific compliance requirements
  • ✅ Combine controls from multiple tiers
  • ✅ Add custom controls for internal standards
  • ✅ Adjust control severity levels for your environment
  • ✅ Create environment-specific policies (dev, staging, prod)

Quick Policy Structure Reference

version: "1.0"
name: "My Custom Policy"
description: "Custom policy for our environment"
tier: "custom"

controls:
  - id: KG-XXX
    title: "Control title"
    description: "What this control checks"
    severity: HIGH | MEDIUM | LOW
    category: security | reliability | operational | compliance
    expr: |
      # CEL expression that returns true (pass) or false (fail)
      broker.config["auto.create.topics.enable"] == "false"
    remediation: |
      Step-by-step fix instructions
    compliance:
      pci_dss: ["4.1", "8.2"]
      soc2: ["CC6.1"]
      iso27001: ["A.12.6.1"]

Comprehensive Policy Creation Guide

For complete documentation on creating custom policies, see:

Policy Creation Guide (Story 9.3)

This comprehensive guide covers:

  • Policy file structure and schema
  • CEL expression syntax and examples
  • Available variables (broker, topic, cluster)
  • Control ID conventions (KG-001 to KG-999)
  • Severity and category guidelines
  • Compliance mapping (PCI-DSS, SOC2, ISO 27001)
  • Testing and validation
  • Examples and templates

Policy Validation

Always validate custom policies before deployment:

# Validate policy syntax
kafkaguard validate-policy --policy policies/custom-policy.yaml

# Test policy against dev cluster first
kafkaguard scan \
  --bootstrap kafka-dev:9092 \
  --policy policies/custom-policy.yaml \
  --format json \
  --out ./test-reports

# Review results and iterate
cat test-reports/scan-*.json | jq '.findings[] | select(.status == "FAILED")'

Report Customization

Selecting Report Formats

Generate specific report formats based on your needs:

# Single format
kafkaguard scan --bootstrap kafka:9092 --format json

# Multiple formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf

# All formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf,csv

Output Directory Management

Organize reports by environment, date, or cluster:

# By environment
kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --out /var/reports/prod

# By date
REPORT_DIR="/var/reports/$(date +%Y-%m-%d)"
mkdir -p "$REPORT_DIR"
kafkaguard scan \
  --bootstrap kafka:9092 \
  --out "$REPORT_DIR"

# By cluster
kafkaguard scan \
  --bootstrap kafka-cluster1:9092 \
  --out /var/reports/cluster1

Report Naming Convention

KafkaGuard uses a consistent naming pattern:

scan-<timestamp>-<scan_id>.<format>

Example:
scan-20251115140530-abc123def456.json
scan-20251115140530-abc123def456.html
scan-20251115140530-abc123def456.pdf
scan-20251115140530-abc123def456.csv

Benefits:

  • Chronological sorting (timestamp first)
  • Easy correlation across formats (same scan_id)
  • Unique identifiers prevent overwrites
  • Glob pattern matching (scan-*.json)

Report Archival Strategy

Implement report retention policies:

#!/bin/bash
# archive-reports.sh - Automated report archival

REPORTS_DIR="/var/reports/kafkaguard"
ARCHIVE_DIR="/var/reports/kafkaguard/archive"
RETENTION_DAYS=90

# Create archive structure
mkdir -p "$ARCHIVE_DIR/$(date +%Y-%m)"

# Archive reports older than 30 days
find "$REPORTS_DIR" -maxdepth 1 -name "scan-*" -mtime +30 \
  -exec mv {} "$ARCHIVE_DIR/$(date +%Y-%m)/" \;

# Compress archived reports older than 60 days
find "$ARCHIVE_DIR" -name "scan-*.pdf" -mtime +60 -exec gzip {} \;

# Delete reports older than retention period
find "$ARCHIVE_DIR" -name "scan-*.gz" -mtime +$RETENTION_DAYS -delete

echo "Report archival complete"

Custom Report Templates (Phase 2)

Note: Custom report templates are planned for Phase 2. Currently, report formats use built-in templates.

Planned Features:

  • Custom HTML templates with organization branding
  • Custom PDF layouts and styling
  • Template variables for organization info
  • Export templates for reuse

Performance Tuning

Timeout Configuration

Adjust timeout based on cluster size and network conditions:

# Default timeout: 300 seconds (5 minutes)
kafkaguard scan --bootstrap kafka:9092

# Small clusters (<100 topics): 300 seconds
kafkaguard scan --bootstrap kafka:9092 --timeout 300

# Medium clusters (100-500 topics): 600 seconds (10 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 600

# Large clusters (500-1000 topics): 900 seconds (15 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 900

# Very large clusters (1000+ topics): 1800 seconds (30 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 1800

Parallel Collection Settings

KafkaGuard uses parallel collection by default for faster scans:

# Default: parallel enabled with 6 max collectors
kafkaguard scan --bootstrap kafka:9092

# Increase max collectors for large clusters (10+ brokers)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel true \
  --max-collectors 10

# Reduce max collectors if experiencing memory issues
kafkaguard scan \
  --bootstrap kafka:9092 \
  --max-collectors 3

# Disable parallel collection (not recommended, slower)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel false

Memory Optimization

Monitor and optimize memory usage:

# Monitor memory during scan
top -p $(pgrep kafkaguard)

# Set resource limits (Linux)
ulimit -v 524288  # Limit to 512MB virtual memory
kafkaguard scan --bootstrap kafka:9092

# Docker: Set memory limits
docker run --rm \
  --memory=256m \
  --cpus=1.0 \
  -v $(pwd)/policies:/policies:ro \
  -v $(pwd)/reports:/reports \
  aiopsone/kafkaguard:latest scan \
    --bootstrap kafka:9092 \
    --policy /policies/enterprise-default.yaml \
    --out /reports

Expected Memory Usage:

Cluster SizeExpected Memory
3 brokers, <100 topics50-100 MB
5 brokers, 100-500 topics100-150 MB
10 brokers, 500-1000 topics150-200 MB
20+ brokers, 1000+ topics200-300 MB

If memory usage exceeds these ranges, please report to the KafkaGuard team.

Network Latency Considerations

Optimize for high-latency networks:

# High latency network (>100ms RTT)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --timeout 900 \
  --max-collectors 3  # Reduce concurrent connections

# Monitor network latency
ping -c 10 kafka.example.com

Running Scans During Off-Peak Hours

Schedule scans during low-traffic periods:

# Cron job for 2 AM daily scan
# /etc/cron.d/kafkaguard-nightly
0 2 * * * kafkaguard /usr/local/bin/kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --policy /opt/kafkaguard/policies/enterprise-default.yaml \
  --out /var/reports/kafkaguard/nightly \
  --timeout 900 \
  2>&1 | logger -t kafkaguard

Monitoring System Integration

Splunk Integration

Parse JSON reports and forward to Splunk for analysis:

#!/bin/bash
# splunk-forward.sh - Forward KafkaGuard reports to Splunk

REPORTS_DIR="/var/reports/kafkaguard"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract findings and forward to Splunk
cat "$LATEST_JSON" | jq -c '.findings[]' | while read -r finding; do
  curl -k "$SPLUNK_HEC_URL" \
    -H "Authorization: Splunk $SPLUNK_HEC_TOKEN" \
    -d "{\"event\": $finding, \"sourcetype\": \"kafkaguard:scan\"}"
done

echo "Forwarded $(jq '.findings | length' "$LATEST_JSON") findings to Splunk"

Splunk Search Queries:

# All KafkaGuard findings
sourcetype=kafkaguard:scan

# Failed controls only
sourcetype=kafkaguard:scan status=FAILED

# HIGH severity failures
sourcetype=kafkaguard:scan status=FAILED severity=HIGH

# Failures by control ID
sourcetype=kafkaguard:scan status=FAILED | stats count by control_id

# Trend over time
sourcetype=kafkaguard:scan | timechart count by status

ELK Stack Integration

Index KafkaGuard reports in Elasticsearch for analysis and visualization:

#!/bin/bash
# elk-index.sh - Index KafkaGuard reports in Elasticsearch

REPORTS_DIR="/var/reports/kafkaguard"
ELASTICSEARCH_URL="http://elasticsearch:9200"
INDEX_NAME="kafkaguard-findings"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract metadata and findings
METADATA=$(cat "$LATEST_JSON" | jq -c '.metadata')
FINDINGS=$(cat "$LATEST_JSON" | jq -c '.findings[]')

# Index each finding
echo "$FINDINGS" | while read -r finding; do
  # Combine metadata with finding
  DOCUMENT=$(jq -n \
    --argjson metadata "$METADATA" \
    --argjson finding "$finding" \
    '$metadata + $finding')

  # Index in Elasticsearch
  curl -X POST "$ELASTICSEARCH_URL/$INDEX_NAME/_doc" \
    -H 'Content-Type: application/json' \
    -d "$DOCUMENT"
done

echo "Indexed $(jq '.findings | length' "$LATEST_JSON") findings in Elasticsearch"

Kibana Visualizations:

  • Control Status Pie Chart: status field
  • Severity Distribution Bar Chart: severity field
  • Failed Controls Table: Filter status:FAILED, show control_id, title, severity
  • Score Trend Line Chart: metadata.timestamp vs summary.score
  • Compliance Heatmap: compliance.pci_dss, compliance.soc2, compliance.iso27001

Custom Monitoring with JSON Parsing

Extract specific metrics from JSON reports:

#!/bin/bash
# extract-metrics.sh - Extract metrics from KafkaGuard JSON report

LATEST_JSON=$(ls -t reports/scan-*.json | head -1)

# Extract overall score
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
echo "Overall Score: $SCORE%"

# Extract control counts
TOTAL=$(cat "$LATEST_JSON" | jq -r '.summary.total_controls')
PASSED=$(cat "$LATEST_JSON" | jq -r '.summary.passed')
FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')

echo "Controls: $PASSED passed, $FAILED failed (out of $TOTAL)"

# Extract HIGH severity failures
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')
echo "HIGH severity failures: $FAILED_HIGH"

# Export as Prometheus metrics (text format)
cat > /var/lib/node_exporter/kafkaguard.prom <<EOF
# HELP kafkaguard_scan_score Overall compliance score (0-100)
# TYPE kafkaguard_scan_score gauge
kafkaguard_scan_score $SCORE

# HELP kafkaguard_controls_total Total number of controls evaluated
# TYPE kafkaguard_controls_total gauge
kafkaguard_controls_total $TOTAL

# HELP kafkaguard_controls_passed Number of passed controls
# TYPE kafkaguard_controls_passed gauge
kafkaguard_controls_passed $PASSED

# HELP kafkaguard_controls_failed Number of failed controls
# TYPE kafkaguard_controls_failed gauge
kafkaguard_controls_failed $FAILED

# HELP kafkaguard_failures_high Number of HIGH severity failures
# TYPE kafkaguard_failures_high gauge
kafkaguard_failures_high $FAILED_HIGH
EOF

Prometheus Metrics Export (Phase 2)

Note: Native Prometheus metrics export is planned for Phase 2.

Planned Metrics:

  • kafkaguard_scan_duration_seconds - Scan duration
  • kafkaguard_scan_score - Overall compliance score
  • kafkaguard_controls_total - Total controls evaluated
  • kafkaguard_controls_passed - Passed controls count
  • kafkaguard_controls_failed - Failed controls count
  • kafkaguard_failures_by_severity - Failures by severity (labels: high, medium, low)
  • kafkaguard_scan_timestamp - Last scan timestamp

Ticketing System Integration

Jira Integration

Automatically create Jira tickets for failed controls:

#!/usr/bin/env python3
# jira-create-tickets.py - Create Jira tickets from KafkaGuard findings

import json
import sys
from jira import JIRA

# Jira configuration
JIRA_SERVER = 'https://jira.example.com'
JIRA_USERNAME = 'kafkaguard-bot'
JIRA_API_TOKEN = 'your-api-token'
JIRA_PROJECT = 'SEC'  # Security project

# Connect to Jira
jira = JIRA(server=JIRA_SERVER, basic_auth=(JIRA_USERNAME, JIRA_API_TOKEN))

# Load latest KafkaGuard report
with open(sys.argv[1]) as f:
    report = json.load(f)

# Process failed controls
failed_findings = [f for f in report['findings'] if f['status'] == 'FAILED' and f['severity'] == 'HIGH']

for finding in failed_findings:
    # Check if ticket already exists (avoid duplicates)
    jql = f'project = {JIRA_PROJECT} AND summary ~ "{finding["control_id"]}"'
    existing = jira.search_issues(jql)

    if existing:
        print(f"Ticket already exists for {finding['control_id']}: {existing[0].key}")
        continue

    # Create new ticket
    issue_dict = {
        'project': {'key': JIRA_PROJECT},
        'summary': f"{finding['control_id']}: {finding['title']}",
        'description': f"""
Kafka compliance control failed:

*Control ID:* {finding['control_id']}
*Severity:* {finding['severity']}
*Category:* {finding['category']}

*Evidence:*
{finding['evidence']}

*Remediation Steps:*
{finding['remediation']}

*Compliance Impact:*
- PCI-DSS: {', '.join(finding['compliance']['pci_dss'])}
- SOC2: {', '.join(finding['compliance']['soc2'])}
- ISO 27001: {', '.join(finding['compliance']['iso27001'])}

*Scan Details:*
- Cluster: {report['metadata']['cluster_id']}
- Timestamp: {report['metadata']['timestamp']}
- Policy: {report['metadata']['policy']}
        """,
        'issuetype': {'name': 'Bug'},
        'priority': {'name': 'High'},
        'labels': ['kafkaguard', 'compliance', 'security']
    }

    new_issue = jira.create_issue(fields=issue_dict)
    print(f"Created ticket {new_issue.key} for {finding['control_id']}")

print(f"Processed {len(failed_findings)} HIGH severity findings")

Usage:

# Run scan and create tickets
kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --policy policies/enterprise-default.yaml \
  --format json \
  --out /var/reports

LATEST_JSON=$(ls -t /var/reports/scan-*.json | head -1)
python3 jira-create-tickets.py "$LATEST_JSON"

ServiceNow Integration

Create ServiceNow incidents from KafkaGuard findings:

#!/bin/bash
# servicenow-create-incidents.sh - Create ServiceNow incidents

REPORTS_DIR="/var/reports/kafkaguard"
SNOW_INSTANCE="https://your-instance.service-now.com"
SNOW_USERNAME="kafkaguard-integration"
SNOW_PASSWORD="your-password"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract HIGH severity failures
cat "$LATEST_JSON" | jq -c '.findings[] | select(.status == "FAILED" and .severity == "HIGH")' | while read -r finding; do
  CONTROL_ID=$(echo "$finding" | jq -r '.control_id')
  TITLE=$(echo "$finding" | jq -r '.title')
  EVIDENCE=$(echo "$finding" | jq -r '.evidence')
  REMEDIATION=$(echo "$finding" | jq -r '.remediation')

  # Create ServiceNow incident
  curl -X POST "$SNOW_INSTANCE/api/now/table/incident" \
    -u "$SNOW_USERNAME:$SNOW_PASSWORD" \
    -H "Content-Type: application/json" \
    -d "{
      \"short_description\": \"Kafka Compliance: $CONTROL_ID - $TITLE\",
      \"description\": \"Evidence: $EVIDENCE\\n\\nRemediation: $REMEDIATION\",
      \"urgency\": \"2\",
      \"impact\": \"2\",
      \"priority\": \"2\",
      \"category\": \"Security\",
      \"subcategory\": \"Compliance\"
    }"

  echo "Created incident for $CONTROL_ID"
done

Automation with Scripting

Bash Script Example

Multi-cluster scanning with email notifications:

#!/bin/bash
# automated-compliance-scan.sh - Automated multi-cluster scanning with alerts

set -e

# Configuration
CLUSTERS=("kafka-dev:9092" "kafka-staging:9095" "kafka-prod:9095")
POLICIES=("baseline-dev.yaml" "enterprise-default.yaml" "enterprise-default.yaml")
REPORTS_BASE="/var/reports/kafkaguard"
ALERT_EMAIL="security@example.com"

# Timestamp for this scan run
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
SUMMARY_FILE="/tmp/scan-summary-$TIMESTAMP.txt"

echo "KafkaGuard Automated Compliance Scan - $TIMESTAMP" > "$SUMMARY_FILE"
echo "=================================================" >> "$SUMMARY_FILE"
echo "" >> "$SUMMARY_FILE"

# Scan each cluster
for i in "${!CLUSTERS[@]}"; do
  CLUSTER="${CLUSTERS[$i]}"
  POLICY="${POLICIES[$i]}"
  CLUSTER_NAME=$(echo "$CLUSTER" | cut -d: -f1)

  echo "Scanning $CLUSTER_NAME..." | tee -a "$SUMMARY_FILE"

  REPORT_DIR="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
  mkdir -p "$REPORT_DIR"

  # Run scan
  kafkaguard scan \
    --bootstrap "$CLUSTER" \
    --policy "policies/$POLICY" \
    --format json,html \
    --out "$REPORT_DIR" \
    --log-level info || true

  # Parse results
  LATEST_JSON=$(ls -t "$REPORT_DIR"/scan-*.json | head -1)

  if [ -f "$LATEST_JSON" ]; then
    SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
    FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
    FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

    echo "  Score: $SCORE%" | tee -a "$SUMMARY_FILE"
    echo "  Failed Controls: $FAILED" | tee -a "$SUMMARY_FILE"
    echo "  HIGH Severity Failures: $FAILED_HIGH" | tee -a "$SUMMARY_FILE"

    if [ "$FAILED_HIGH" -gt 0 ]; then
      echo "  ⚠️  ALERT: HIGH severity failures detected!" | tee -a "$SUMMARY_FILE"
    fi
  else
    echo "  ❌ Scan failed!" | tee -a "$SUMMARY_FILE"
  fi

  echo "" | tee -a "$SUMMARY_FILE"
done

# Send email summary
mail -s "KafkaGuard Compliance Scan Summary - $TIMESTAMP" "$ALERT_EMAIL" < "$SUMMARY_FILE"

echo "Scan complete. Summary sent to $ALERT_EMAIL"

Python Script Example

Advanced report aggregation and analysis:

#!/usr/bin/env python3
# aggregate-reports.py - Aggregate and analyze KafkaGuard reports

import json
import glob
from datetime import datetime
from collections import defaultdict

class ReportAggregator:
    def __init__(self, reports_dir):
        self.reports_dir = reports_dir
        self.reports = []

    def load_reports(self):
        """Load all JSON reports"""
        for report_file in glob.glob(f"{self.reports_dir}/**/scan-*.json", recursive=True):
            with open(report_file) as f:
                report = json.load(f)
                report['_file'] = report_file
                self.reports.append(report)
        print(f"Loaded {len(self.reports)} reports")

    def aggregate_scores(self):
        """Calculate aggregate scores"""
        scores = [r['summary']['score'] for r in self.reports]
        return {
            'avg_score': sum(scores) / len(scores),
            'min_score': min(scores),
            'max_score': max(scores),
            'total_reports': len(scores)
        }

    def aggregate_failures_by_control(self):
        """Count failures by control ID"""
        failures = defaultdict(int)

        for report in self.reports:
            for finding in report['findings']:
                if finding['status'] == 'FAILED':
                    failures[finding['control_id']] += 1

        # Sort by frequency
        return sorted(failures.items(), key=lambda x: x[1], reverse=True)

    def aggregate_failures_by_severity(self):
        """Count failures by severity"""
        severity_counts = defaultdict(int)

        for report in self.reports:
            for finding in report['findings']:
                if finding['status'] == 'FAILED':
                    severity_counts[finding['severity']] += 1

        return dict(severity_counts)

    def generate_trend_data(self):
        """Generate score trend over time"""
        trend = []

        for report in sorted(self.reports, key=lambda r: r['metadata']['timestamp']):
            trend.append({
                'timestamp': report['metadata']['timestamp'],
                'score': report['summary']['score'],
                'failed': report['summary']['failed']
            })

        return trend

    def generate_summary(self):
        """Generate comprehensive summary"""
        print("\nKafkaGuard Report Aggregation Summary")
        print("=" * 50)

        # Scores
        scores = self.aggregate_scores()
        print(f"\nOverall Scores:")
        print(f"  Average: {scores['avg_score']:.1f}%")
        print(f"  Min: {scores['min_score']}%")
        print(f"  Max: {scores['max_score']}%")
        print(f"  Total Reports: {scores['total_reports']}")

        # Top failures
        print(f"\nTop 10 Failed Controls:")
        failures = self.aggregate_failures_by_control()
        for control_id, count in failures[:10]:
            print(f"  {control_id}: {count} failures")

        # Severity distribution
        print(f"\nFailures by Severity:")
        severity = self.aggregate_failures_by_severity()
        for sev, count in severity.items():
            print(f"  {sev}: {count}")

        # Trend
        print(f"\nScore Trend (last 5 scans):")
        trend = self.generate_trend_data()
        for entry in trend[-5:]:
            print(f"  {entry['timestamp']}: {entry['score']}% ({entry['failed']} failed)")

if __name__ == '__main__':
    import sys

    reports_dir = sys.argv[1] if len(sys.argv) > 1 else '/var/reports/kafkaguard'

    aggregator = ReportAggregator(reports_dir)
    aggregator.load_reports()
    aggregator.generate_summary()

Usage:

python3 aggregate-reports.py /var/reports/kafkaguard

PowerShell Example (Windows)

# kafkaguard-scan.ps1 - Windows PowerShell automation

param(
    [string]$Bootstrap = "kafka.example.com:9092",
    [string]$Policy = "policies/enterprise-default.yaml",
    [string]$OutDir = "C:\Reports\KafkaGuard"
)

Write-Host "KafkaGuard Automated Scan" -ForegroundColor Green
Write-Host "=========================" -ForegroundColor Green

# Create reports directory
New-Item -ItemType Directory -Force -Path $OutDir | Out-Null

# Run scan
Write-Host "Scanning $Bootstrap..."
& kafkaguard scan `
    --bootstrap $Bootstrap `
    --policy $Policy `
    --format json,html `
    --out $OutDir `
    --log-level info

# Parse results
$LatestJson = Get-ChildItem -Path $OutDir -Filter "scan-*.json" | Sort-Object LastWriteTime -Descending | Select-Object -First 1

if ($LatestJson) {
    $Report = Get-Content $LatestJson.FullName | ConvertFrom-Json
    $Score = $Report.summary.score
    $Failed = $Report.summary.failed
    $FailedHigh = ($Report.findings | Where-Object { $_.status -eq "FAILED" -and $_.severity -eq "HIGH" }).Count

    Write-Host ""
    Write-Host "Results:" -ForegroundColor Yellow
    Write-Host "  Score: $Score%"
    Write-Host "  Failed Controls: $Failed"
    Write-Host "  HIGH Severity Failures: $FailedHigh"

    if ($FailedHigh -gt 0) {
        Write-Host "  WARNING: HIGH severity failures detected!" -ForegroundColor Red
    }
}
else {
    Write-Host "ERROR: Scan failed!" -ForegroundColor Red
    exit 1
}

Write-Host ""
Write-Host "Scan complete. Reports saved to $OutDir"

Air-Gapped Deployment Best Practices

Bundle Creation and Verification

Create reproducible, versioned bundles:

#!/bin/bash
# create-airgapped-bundle.sh - Create versioned air-gapped bundle

VERSION="1.0.0"
BUILD_DATE=$(date +%Y%m%d)
BUNDLE_NAME="kafkaguard-airgapped-${VERSION}-${BUILD_DATE}"
BUNDLE_DIR="$BUNDLE_NAME"

echo "Creating KafkaGuard air-gapped bundle: $BUNDLE_NAME"

# Create bundle structure
mkdir -p "$BUNDLE_DIR"/{bin,policies,certs,docs,scripts}

# Download KafkaGuard binary
wget -O "$BUNDLE_DIR/bin/kafkaguard" \
  https://github.com/aiopsone/kafkaguard-releases/releases/download/v${VERSION}/kafkaguard_Linux_x86_64.tar.gz
chmod +x "$BUNDLE_DIR/bin/kafkaguard"

# Clone repository for policies and docs
git clone --depth 1 --branch v${VERSION} https://github.com/aiopsone/kafkaguard.git /tmp/kafkaguard-repo
cp -r /tmp/kafkaguard-repo/policies "$BUNDLE_DIR/"
cp -r /tmp/kafkaguard-repo/docs "$BUNDLE_DIR/"
cp /tmp/kafkaguard-repo/README.md "$BUNDLE_DIR/"

# Create installation script
cat > "$BUNDLE_DIR/install.sh" <<'EOF'
#!/bin/bash
# install.sh - Install KafkaGuard in air-gapped environment

set -e

echo "Installing KafkaGuard..."

# Install binary
sudo cp bin/kafkaguard /usr/local/bin/
sudo chmod +x /usr/local/bin/kafkaguard

# Install policies
sudo mkdir -p /opt/kafkaguard
sudo cp -r policies /opt/kafkaguard/
sudo cp -r docs /opt/kafkaguard/

# Verify installation
kafkaguard version

echo "✅ KafkaGuard installed successfully"
echo ""
echo "Next steps:"
echo "1. Copy certificates to /opt/kafkaguard/certs/"
echo "2. Run: kafkaguard scan --bootstrap <kafka-broker> --policy /opt/kafkaguard/policies/enterprise-default.yaml"
EOF

chmod +x "$BUNDLE_DIR/install.sh"

# Create manifest
cat > "$BUNDLE_DIR/MANIFEST.txt" <<EOF
KafkaGuard Air-Gapped Bundle
=============================

Version: $VERSION
Build Date: $BUILD_DATE
Platform: linux-amd64

Contents:
- bin/kafkaguard: KafkaGuard binary
- policies/: Policy files (baseline-dev, enterprise-default)
- docs/: Documentation
- install.sh: Installation script

Installation:
1. Extract bundle: tar -xzf ${BUNDLE_NAME}.tar.gz
2. Run: cd ${BUNDLE_NAME} && sudo ./install.sh
3. Verify: kafkaguard version

For full documentation, see docs/user-guide/index.md
EOF

# Create tarball
cd ..
tar -czf "${BUNDLE_NAME}.tar.gz" "$BUNDLE_NAME"

# Generate checksums
sha256sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.sha256"
md5sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.md5"

# Create verification script
cat > "verify-${BUNDLE_NAME}.sh" <<EOF
#!/bin/bash
# Verify bundle integrity

echo "Verifying bundle integrity..."

if sha256sum -c "${BUNDLE_NAME}.tar.gz.sha256"; then
    echo "✅ SHA256 checksum valid"
else
    echo "❌ SHA256 checksum FAILED"
    exit 1
fi

if md5sum -c "${BUNDLE_NAME}.tar.gz.md5"; then
    echo "✅ MD5 checksum valid"
else
    echo "❌ MD5 checksum FAILED"
    exit 1
fi

echo "✅ Bundle integrity verified"
EOF

chmod +x "verify-${BUNDLE_NAME}.sh"

# Clean up
rm -rf /tmp/kafkaguard-repo

echo "✅ Bundle created: ${BUNDLE_NAME}.tar.gz"
echo "   Size: $(du -h "${BUNDLE_NAME}.tar.gz" | cut -f1)"
echo "   SHA256: $(cat "${BUNDLE_NAME}.tar.gz.sha256" | cut -d' ' -f1)"
echo ""
echo "Verification script: verify-${BUNDLE_NAME}.sh"

Secure Transfer Procedures

Document secure transfer process:

  1. Generate checksums (SHA256, MD5)
  2. Encrypt bundle (GPG, AES-256)
  3. Transfer via approved method (USB, secure file transfer)
  4. Verify integrity on air-gapped system
  5. Decrypt and extract
  6. Validate checksums again

Updating KafkaGuard in Air-Gapped Environments

Establish update procedures:

#!/bin/bash
# update-airgapped.sh - Update KafkaGuard in air-gapped environment

CURRENT_VERSION=$(kafkaguard version --format json | jq -r '.version')
NEW_BUNDLE="kafkaguard-airgapped-1.1.0-20251201.tar.gz"

echo "Current version: $CURRENT_VERSION"
echo "Updating to new bundle: $NEW_BUNDLE"

# Verify bundle
sha256sum -c "$NEW_BUNDLE.sha256" || exit 1

# Extract bundle
tar -xzf "$NEW_BUNDLE"
cd kafkaguard-airgapped-*

# Backup current installation
sudo cp /usr/local/bin/kafkaguard /usr/local/bin/kafkaguard.backup
sudo cp -r /opt/kafkaguard /opt/kafkaguard.backup

# Install new version
sudo ./install.sh

# Verify update
NEW_VERSION=$(kafkaguard version --format json | jq -r '.version')
echo "Updated to version: $NEW_VERSION"

# Test scan
kafkaguard scan \
  --bootstrap kafka-internal:9092 \
  --policy /opt/kafkaguard/policies/baseline-dev.yaml \
  --format json \
  --out /tmp/test-reports

if [ $? -eq 0 ] || [ $? -eq 1 ]; then
    echo "✅ Update successful and verified"
else
    echo "❌ Update verification failed, rolling back..."
    sudo cp /usr/local/bin/kafkaguard.backup /usr/local/bin/kafkaguard
    exit 1
fi

Policy Updates in Air-Gapped Environments

Manage policy updates separately:

# Update only policies (no binary update)
cd /opt/kafkaguard
sudo cp -r policies policies.backup.$(date +%Y%m%d)
sudo cp -r /path/to/new/policies .

# Validate new policies
for policy in policies/*.yaml; do
    kafkaguard validate-policy --policy "$policy"
done

Versioning and Change Management

Maintain change log for air-gapped deployments:

# Air-Gapped Deployment Change Log

## 2025-11-15: v1.0.0 Initial Deployment
- Installed KafkaGuard v1.0.0
- Deployed policies: baseline-dev, enterprise-default
- Configured for kafka-internal:9092 cluster
- First scan: 95% score

## 2025-12-01: Policy Update
- Updated enterprise-default policy
- Added 2 custom controls (KG-101, KG-102)
- Re-scan: 93% score (new controls)

## 2025-12-15: v1.1.0 Upgrade
- Upgraded KafkaGuard to v1.1.0
- New features: improved report generation
- Verified: All scans working

Production Security Considerations

Least Privilege Access

KafkaGuard only needs read-only access to Kafka clusters:

# Required Kafka ACLs for KafkaGuard
kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --cluster kafka-cluster

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --topic '*'

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --group '*'

NOT required:

  • ❌ WRITE permissions
  • ❌ DELETE permissions
  • ❌ ALTER permissions
  • ❌ CREATE permissions

Credential Rotation Procedures

Implement regular credential rotation:

#!/bin/bash
# rotate-credentials.sh - Rotate KafkaGuard credentials

NEW_PASSWORD=$(openssl rand -base64 32)

# Update password on Kafka broker
kafka-configs.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --alter --add-config "SCRAM-SHA-512=[password=$NEW_PASSWORD]" \
  --entity-type users --entity-name kafkaguard

# Update in Vault
vault kv put secret/kafka/prod password="$NEW_PASSWORD"

# Test new credentials
export KAFKAGUARD_SASL_PASSWORD="$NEW_PASSWORD"
kafkaguard scan \
  --bootstrap kafka:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --sasl-username kafkaguard \
  --policy policies/enterprise-default.yaml \
  --format json \
  --out /tmp/test-reports

if [ $? -eq 0 ] || [ $? -eq 1 ]; then
    echo "✅ Credential rotation successful"
else
    echo "❌ Credential rotation failed - reverting"
    # Revert to old password
    exit 1
fi

Audit Logging

Enable comprehensive audit logging:

#!/bin/bash
# Log all KafkaGuard executions

LOG_DIR="/var/log/kafkaguard"
mkdir -p "$LOG_DIR"

LOG_FILE="$LOG_DIR/audit-$(date +%Y%m%d).log"

# Log execution details
echo "$(date -Iseconds) | User: $(whoami) | Command: kafkaguard scan $@" >> "$LOG_FILE"

# Run scan with logging
kafkaguard scan "$@" 2>&1 | tee -a "$LOG_FILE"

EXIT_CODE=$?

# Log result
echo "$(date -Iseconds) | Exit Code: $EXIT_CODE" >> "$LOG_FILE"

exit $EXIT_CODE

Log Retention:

  • Development: 30 days
  • Production: 90+ days (PCI-DSS requirement)
  • Compliance: Per regulatory requirements (1-7 years)

Network Segmentation

Run KafkaGuard in management/monitoring network:

  • DO: Run KafkaGuard from dedicated management network
  • DO: Restrict network access to Kafka brokers (firewall rules)
  • DO: Use VPN for remote scanning
  • DON'T: Run from untrusted networks
  • DON'T: Allow KafkaGuard to be accessible from internet

Report Access Control

Protect reports containing sensitive cluster information:

# Set restrictive permissions on reports
chmod 600 reports/*.pdf
chmod 600 reports/*.json

# Store reports in access-controlled directory
REPORTS_DIR="/var/reports/kafkaguard"
sudo chown -R kafkaguard:security "$REPORTS_DIR"
sudo chmod 750 "$REPORTS_DIR"

Report Distribution:

  • ✅ Use secure file transfer (SFTP, S3 with encryption)
  • ✅ Encrypt reports before sharing (GPG, ZIP with password)
  • ✅ Track report access (audit logs)
  • ❌ Don't email reports unencrypted
  • ❌ Don't store reports in public locations

Next Steps

For more information:


Document Information

  • Last Updated: 2025-11-15
  • Applies to Version: KafkaGuard 1.0.0+
  • Feedback: Open an issue for improvements