Advanced Usage Guide
Advanced topics, integrations, and best practices for power users and production deployments.
Table of Contents
- Custom Policy Creation
- Report Customization
- Performance Tuning
- Monitoring System Integration
- Ticketing System Integration
- Automation with Scripting
- Air-Gapped Deployment Best Practices
- Production Security Considerations
Custom Policy Creation
Overview
While KafkaGuard provides three built-in policy tiers (baseline-dev, enterprise-default, and finance-iso in Phase 2), you can create custom policies tailored to your organization's specific requirements.
When to Create Custom Policies
Create custom policies when you need to:
- ✅ Enforce organization-specific compliance requirements
- ✅ Combine controls from multiple tiers
- ✅ Add custom controls for internal standards
- ✅ Adjust control severity levels for your environment
- ✅ Create environment-specific policies (dev, staging, prod)
Quick Policy Structure Reference
version: "1.0"
name: "My Custom Policy"
description: "Custom policy for our environment"
tier: "custom"
controls:
- id: KG-XXX
title: "Control title"
description: "What this control checks"
severity: HIGH | MEDIUM | LOW
category: security | reliability | operational | compliance
expr: |
# CEL expression that returns true (pass) or false (fail)
broker.config["auto.create.topics.enable"] == "false"
remediation: |
Step-by-step fix instructions
compliance:
pci_dss: ["4.1", "8.2"]
soc2: ["CC6.1"]
iso27001: ["A.12.6.1"]
Comprehensive Policy Creation Guide
For complete documentation on creating custom policies, see:
→ Policy Creation Guide (Story 9.3)
This comprehensive guide covers:
- Policy file structure and schema
- CEL expression syntax and examples
- Available variables (broker, topic, cluster)
- Control ID conventions (KG-001 to KG-999)
- Severity and category guidelines
- Compliance mapping (PCI-DSS, SOC2, ISO 27001)
- Testing and validation
- Examples and templates
Policy Validation
Always validate custom policies before deployment:
# Validate policy syntax
kafkaguard validate-policy --policy policies/custom-policy.yaml
# Test policy against dev cluster first
kafkaguard scan \
--bootstrap kafka-dev:9092 \
--policy policies/custom-policy.yaml \
--format json \
--out ./test-reports
# Review results and iterate
cat test-reports/scan-*.json | jq '.findings[] | select(.status == "FAILED")'
Report Customization
Selecting Report Formats
Generate specific report formats based on your needs:
# Single format
kafkaguard scan --bootstrap kafka:9092 --format json
# Multiple formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf
# All formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf,csv
Output Directory Management
Organize reports by environment, date, or cluster:
# By environment
kafkaguard scan \
--bootstrap kafka-prod:9095 \
--out /var/reports/prod
# By date
REPORT_DIR="/var/reports/$(date +%Y-%m-%d)"
mkdir -p "$REPORT_DIR"
kafkaguard scan \
--bootstrap kafka:9092 \
--out "$REPORT_DIR"
# By cluster
kafkaguard scan \
--bootstrap kafka-cluster1:9092 \
--out /var/reports/cluster1
Report Naming Convention
KafkaGuard uses a consistent naming pattern:
scan-<timestamp>-<scan_id>.<format>
Example:
scan-20251115140530-abc123def456.json
scan-20251115140530-abc123def456.html
scan-20251115140530-abc123def456.pdf
scan-20251115140530-abc123def456.csv
Benefits:
- Chronological sorting (timestamp first)
- Easy correlation across formats (same scan_id)
- Unique identifiers prevent overwrites
- Glob pattern matching (
scan-*.json)
Report Archival Strategy
Implement report retention policies:
#!/bin/bash
# archive-reports.sh - Automated report archival
REPORTS_DIR="/var/reports/kafkaguard"
ARCHIVE_DIR="/var/reports/kafkaguard/archive"
RETENTION_DAYS=90
# Create archive structure
mkdir -p "$ARCHIVE_DIR/$(date +%Y-%m)"
# Archive reports older than 30 days
find "$REPORTS_DIR" -maxdepth 1 -name "scan-*" -mtime +30 \
-exec mv {} "$ARCHIVE_DIR/$(date +%Y-%m)/" \;
# Compress archived reports older than 60 days
find "$ARCHIVE_DIR" -name "scan-*.pdf" -mtime +60 -exec gzip {} \;
# Delete reports older than retention period
find "$ARCHIVE_DIR" -name "scan-*.gz" -mtime +$RETENTION_DAYS -delete
echo "Report archival complete"
Custom Report Templates (Phase 2)
Note: Custom report templates are planned for Phase 2. Currently, report formats use built-in templates.
Planned Features:
- Custom HTML templates with organization branding
- Custom PDF layouts and styling
- Template variables for organization info
- Export templates for reuse
Performance Tuning
Timeout Configuration
Adjust timeout based on cluster size and network conditions:
# Default timeout: 300 seconds (5 minutes)
kafkaguard scan --bootstrap kafka:9092
# Small clusters (<100 topics): 300 seconds
kafkaguard scan --bootstrap kafka:9092 --timeout 300
# Medium clusters (100-500 topics): 600 seconds (10 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 600
# Large clusters (500-1000 topics): 900 seconds (15 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 900
# Very large clusters (1000+ topics): 1800 seconds (30 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 1800
Parallel Collection Settings
KafkaGuard uses parallel collection by default for faster scans:
# Default: parallel enabled with 6 max collectors
kafkaguard scan --bootstrap kafka:9092
# Increase max collectors for large clusters (10+ brokers)
kafkaguard scan \
--bootstrap kafka:9092 \
--parallel true \
--max-collectors 10
# Reduce max collectors if experiencing memory issues
kafkaguard scan \
--bootstrap kafka:9092 \
--max-collectors 3
# Disable parallel collection (not recommended, slower)
kafkaguard scan \
--bootstrap kafka:9092 \
--parallel false
Memory Optimization
Monitor and optimize memory usage:
# Monitor memory during scan
top -p $(pgrep kafkaguard)
# Set resource limits (Linux)
ulimit -v 524288 # Limit to 512MB virtual memory
kafkaguard scan --bootstrap kafka:9092
# Docker: Set memory limits
docker run --rm \
--memory=256m \
--cpus=1.0 \
-v $(pwd)/policies:/policies:ro \
-v $(pwd)/reports:/reports \
aiopsone/kafkaguard:latest scan \
--bootstrap kafka:9092 \
--policy /policies/enterprise-default.yaml \
--out /reports
Expected Memory Usage:
| Cluster Size | Expected Memory |
|---|---|
| 3 brokers, <100 topics | 50-100 MB |
| 5 brokers, 100-500 topics | 100-150 MB |
| 10 brokers, 500-1000 topics | 150-200 MB |
| 20+ brokers, 1000+ topics | 200-300 MB |
If memory usage exceeds these ranges, please report to the KafkaGuard team.
Network Latency Considerations
Optimize for high-latency networks:
# High latency network (>100ms RTT)
kafkaguard scan \
--bootstrap kafka:9092 \
--timeout 900 \
--max-collectors 3 # Reduce concurrent connections
# Monitor network latency
ping -c 10 kafka.example.com
Running Scans During Off-Peak Hours
Schedule scans during low-traffic periods:
# Cron job for 2 AM daily scan
# /etc/cron.d/kafkaguard-nightly
0 2 * * * kafkaguard /usr/local/bin/kafkaguard scan \
--bootstrap kafka-prod:9095 \
--policy /opt/kafkaguard/policies/enterprise-default.yaml \
--out /var/reports/kafkaguard/nightly \
--timeout 900 \
2>&1 | logger -t kafkaguard
Monitoring System Integration
Splunk Integration
Parse JSON reports and forward to Splunk for analysis:
#!/bin/bash
# splunk-forward.sh - Forward KafkaGuard reports to Splunk
REPORTS_DIR="/var/reports/kafkaguard"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token"
# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
# Extract findings and forward to Splunk
cat "$LATEST_JSON" | jq -c '.findings[]' | while read -r finding; do
curl -k "$SPLUNK_HEC_URL" \
-H "Authorization: Splunk $SPLUNK_HEC_TOKEN" \
-d "{\"event\": $finding, \"sourcetype\": \"kafkaguard:scan\"}"
done
echo "Forwarded $(jq '.findings | length' "$LATEST_JSON") findings to Splunk"
Splunk Search Queries:
# All KafkaGuard findings
sourcetype=kafkaguard:scan
# Failed controls only
sourcetype=kafkaguard:scan status=FAILED
# HIGH severity failures
sourcetype=kafkaguard:scan status=FAILED severity=HIGH
# Failures by control ID
sourcetype=kafkaguard:scan status=FAILED | stats count by control_id
# Trend over time
sourcetype=kafkaguard:scan | timechart count by status
ELK Stack Integration
Index KafkaGuard reports in Elasticsearch for analysis and visualization:
#!/bin/bash
# elk-index.sh - Index KafkaGuard reports in Elasticsearch
REPORTS_DIR="/var/reports/kafkaguard"
ELASTICSEARCH_URL="http://elasticsearch:9200"
INDEX_NAME="kafkaguard-findings"
# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
# Extract metadata and findings
METADATA=$(cat "$LATEST_JSON" | jq -c '.metadata')
FINDINGS=$(cat "$LATEST_JSON" | jq -c '.findings[]')
# Index each finding
echo "$FINDINGS" | while read -r finding; do
# Combine metadata with finding
DOCUMENT=$(jq -n \
--argjson metadata "$METADATA" \
--argjson finding "$finding" \
'$metadata + $finding')
# Index in Elasticsearch
curl -X POST "$ELASTICSEARCH_URL/$INDEX_NAME/_doc" \
-H 'Content-Type: application/json' \
-d "$DOCUMENT"
done
echo "Indexed $(jq '.findings | length' "$LATEST_JSON") findings in Elasticsearch"
Kibana Visualizations:
- Control Status Pie Chart:
statusfield - Severity Distribution Bar Chart:
severityfield - Failed Controls Table: Filter
status:FAILED, showcontrol_id,title,severity - Score Trend Line Chart:
metadata.timestampvssummary.score - Compliance Heatmap:
compliance.pci_dss,compliance.soc2,compliance.iso27001
Custom Monitoring with JSON Parsing
Extract specific metrics from JSON reports:
#!/bin/bash
# extract-metrics.sh - Extract metrics from KafkaGuard JSON report
LATEST_JSON=$(ls -t reports/scan-*.json | head -1)
# Extract overall score
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
echo "Overall Score: $SCORE%"
# Extract control counts
TOTAL=$(cat "$LATEST_JSON" | jq -r '.summary.total_controls')
PASSED=$(cat "$LATEST_JSON" | jq -r '.summary.passed')
FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
echo "Controls: $PASSED passed, $FAILED failed (out of $TOTAL)"
# Extract HIGH severity failures
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')
echo "HIGH severity failures: $FAILED_HIGH"
# Export as Prometheus metrics (text format)
cat > /var/lib/node_exporter/kafkaguard.prom <<EOF
# HELP kafkaguard_scan_score Overall compliance score (0-100)
# TYPE kafkaguard_scan_score gauge
kafkaguard_scan_score $SCORE
# HELP kafkaguard_controls_total Total number of controls evaluated
# TYPE kafkaguard_controls_total gauge
kafkaguard_controls_total $TOTAL
# HELP kafkaguard_controls_passed Number of passed controls
# TYPE kafkaguard_controls_passed gauge
kafkaguard_controls_passed $PASSED
# HELP kafkaguard_controls_failed Number of failed controls
# TYPE kafkaguard_controls_failed gauge
kafkaguard_controls_failed $FAILED
# HELP kafkaguard_failures_high Number of HIGH severity failures
# TYPE kafkaguard_failures_high gauge
kafkaguard_failures_high $FAILED_HIGH
EOF
Prometheus Metrics Export (Phase 2)
Note: Native Prometheus metrics export is planned for Phase 2.
Planned Metrics:
kafkaguard_scan_duration_seconds- Scan durationkafkaguard_scan_score- Overall compliance scorekafkaguard_controls_total- Total controls evaluatedkafkaguard_controls_passed- Passed controls countkafkaguard_controls_failed- Failed controls countkafkaguard_failures_by_severity- Failures by severity (labels: high, medium, low)kafkaguard_scan_timestamp- Last scan timestamp
Ticketing System Integration
Jira Integration
Automatically create Jira tickets for failed controls:
#!/usr/bin/env python3
# jira-create-tickets.py - Create Jira tickets from KafkaGuard findings
import json
import sys
from jira import JIRA
# Jira configuration
JIRA_SERVER = 'https://jira.example.com'
JIRA_USERNAME = 'kafkaguard-bot'
JIRA_API_TOKEN = 'your-api-token'
JIRA_PROJECT = 'SEC' # Security project
# Connect to Jira
jira = JIRA(server=JIRA_SERVER, basic_auth=(JIRA_USERNAME, JIRA_API_TOKEN))
# Load latest KafkaGuard report
with open(sys.argv[1]) as f:
report = json.load(f)
# Process failed controls
failed_findings = [f for f in report['findings'] if f['status'] == 'FAILED' and f['severity'] == 'HIGH']
for finding in failed_findings:
# Check if ticket already exists (avoid duplicates)
jql = f'project = {JIRA_PROJECT} AND summary ~ "{finding["control_id"]}"'
existing = jira.search_issues(jql)
if existing:
print(f"Ticket already exists for {finding['control_id']}: {existing[0].key}")
continue
# Create new ticket
issue_dict = {
'project': {'key': JIRA_PROJECT},
'summary': f"{finding['control_id']}: {finding['title']}",
'description': f"""
Kafka compliance control failed:
*Control ID:* {finding['control_id']}
*Severity:* {finding['severity']}
*Category:* {finding['category']}
*Evidence:*
{finding['evidence']}
*Remediation Steps:*
{finding['remediation']}
*Compliance Impact:*
- PCI-DSS: {', '.join(finding['compliance']['pci_dss'])}
- SOC2: {', '.join(finding['compliance']['soc2'])}
- ISO 27001: {', '.join(finding['compliance']['iso27001'])}
*Scan Details:*
- Cluster: {report['metadata']['cluster_id']}
- Timestamp: {report['metadata']['timestamp']}
- Policy: {report['metadata']['policy']}
""",
'issuetype': {'name': 'Bug'},
'priority': {'name': 'High'},
'labels': ['kafkaguard', 'compliance', 'security']
}
new_issue = jira.create_issue(fields=issue_dict)
print(f"Created ticket {new_issue.key} for {finding['control_id']}")
print(f"Processed {len(failed_findings)} HIGH severity findings")
Usage:
# Run scan and create tickets
kafkaguard scan \
--bootstrap kafka-prod:9095 \
--policy policies/enterprise-default.yaml \
--format json \
--out /var/reports
LATEST_JSON=$(ls -t /var/reports/scan-*.json | head -1)
python3 jira-create-tickets.py "$LATEST_JSON"
ServiceNow Integration
Create ServiceNow incidents from KafkaGuard findings:
#!/bin/bash
# servicenow-create-incidents.sh - Create ServiceNow incidents
REPORTS_DIR="/var/reports/kafkaguard"
SNOW_INSTANCE="https://your-instance.service-now.com"
SNOW_USERNAME="kafkaguard-integration"
SNOW_PASSWORD="your-password"
# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
# Extract HIGH severity failures
cat "$LATEST_JSON" | jq -c '.findings[] | select(.status == "FAILED" and .severity == "HIGH")' | while read -r finding; do
CONTROL_ID=$(echo "$finding" | jq -r '.control_id')
TITLE=$(echo "$finding" | jq -r '.title')
EVIDENCE=$(echo "$finding" | jq -r '.evidence')
REMEDIATION=$(echo "$finding" | jq -r '.remediation')
# Create ServiceNow incident
curl -X POST "$SNOW_INSTANCE/api/now/table/incident" \
-u "$SNOW_USERNAME:$SNOW_PASSWORD" \
-H "Content-Type: application/json" \
-d "{
\"short_description\": \"Kafka Compliance: $CONTROL_ID - $TITLE\",
\"description\": \"Evidence: $EVIDENCE\\n\\nRemediation: $REMEDIATION\",
\"urgency\": \"2\",
\"impact\": \"2\",
\"priority\": \"2\",
\"category\": \"Security\",
\"subcategory\": \"Compliance\"
}"
echo "Created incident for $CONTROL_ID"
done
Automation with Scripting
Bash Script Example
Multi-cluster scanning with email notifications:
#!/bin/bash
# automated-compliance-scan.sh - Automated multi-cluster scanning with alerts
set -e
# Configuration
CLUSTERS=("kafka-dev:9092" "kafka-staging:9095" "kafka-prod:9095")
POLICIES=("baseline-dev.yaml" "enterprise-default.yaml" "enterprise-default.yaml")
REPORTS_BASE="/var/reports/kafkaguard"
ALERT_EMAIL="security@example.com"
# Timestamp for this scan run
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
SUMMARY_FILE="/tmp/scan-summary-$TIMESTAMP.txt"
echo "KafkaGuard Automated Compliance Scan - $TIMESTAMP" > "$SUMMARY_FILE"
echo "=================================================" >> "$SUMMARY_FILE"
echo "" >> "$SUMMARY_FILE"
# Scan each cluster
for i in "${!CLUSTERS[@]}"; do
CLUSTER="${CLUSTERS[$i]}"
POLICY="${POLICIES[$i]}"
CLUSTER_NAME=$(echo "$CLUSTER" | cut -d: -f1)
echo "Scanning $CLUSTER_NAME..." | tee -a "$SUMMARY_FILE"
REPORT_DIR="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
mkdir -p "$REPORT_DIR"
# Run scan
kafkaguard scan \
--bootstrap "$CLUSTER" \
--policy "policies/$POLICY" \
--format json,html \
--out "$REPORT_DIR" \
--log-level info || true
# Parse results
LATEST_JSON=$(ls -t "$REPORT_DIR"/scan-*.json | head -1)
if [ -f "$LATEST_JSON" ]; then
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')
echo " Score: $SCORE%" | tee -a "$SUMMARY_FILE"
echo " Failed Controls: $FAILED" | tee -a "$SUMMARY_FILE"
echo " HIGH Severity Failures: $FAILED_HIGH" | tee -a "$SUMMARY_FILE"
if [ "$FAILED_HIGH" -gt 0 ]; then
echo " ⚠️ ALERT: HIGH severity failures detected!" | tee -a "$SUMMARY_FILE"
fi
else
echo " ❌ Scan failed!" | tee -a "$SUMMARY_FILE"
fi
echo "" | tee -a "$SUMMARY_FILE"
done
# Send email summary
mail -s "KafkaGuard Compliance Scan Summary - $TIMESTAMP" "$ALERT_EMAIL" < "$SUMMARY_FILE"
echo "Scan complete. Summary sent to $ALERT_EMAIL"
Python Script Example
Advanced report aggregation and analysis:
#!/usr/bin/env python3
# aggregate-reports.py - Aggregate and analyze KafkaGuard reports
import json
import glob
from datetime import datetime
from collections import defaultdict
class ReportAggregator:
def __init__(self, reports_dir):
self.reports_dir = reports_dir
self.reports = []
def load_reports(self):
"""Load all JSON reports"""
for report_file in glob.glob(f"{self.reports_dir}/**/scan-*.json", recursive=True):
with open(report_file) as f:
report = json.load(f)
report['_file'] = report_file
self.reports.append(report)
print(f"Loaded {len(self.reports)} reports")
def aggregate_scores(self):
"""Calculate aggregate scores"""
scores = [r['summary']['score'] for r in self.reports]
return {
'avg_score': sum(scores) / len(scores),
'min_score': min(scores),
'max_score': max(scores),
'total_reports': len(scores)
}
def aggregate_failures_by_control(self):
"""Count failures by control ID"""
failures = defaultdict(int)
for report in self.reports:
for finding in report['findings']:
if finding['status'] == 'FAILED':
failures[finding['control_id']] += 1
# Sort by frequency
return sorted(failures.items(), key=lambda x: x[1], reverse=True)
def aggregate_failures_by_severity(self):
"""Count failures by severity"""
severity_counts = defaultdict(int)
for report in self.reports:
for finding in report['findings']:
if finding['status'] == 'FAILED':
severity_counts[finding['severity']] += 1
return dict(severity_counts)
def generate_trend_data(self):
"""Generate score trend over time"""
trend = []
for report in sorted(self.reports, key=lambda r: r['metadata']['timestamp']):
trend.append({
'timestamp': report['metadata']['timestamp'],
'score': report['summary']['score'],
'failed': report['summary']['failed']
})
return trend
def generate_summary(self):
"""Generate comprehensive summary"""
print("\nKafkaGuard Report Aggregation Summary")
print("=" * 50)
# Scores
scores = self.aggregate_scores()
print(f"\nOverall Scores:")
print(f" Average: {scores['avg_score']:.1f}%")
print(f" Min: {scores['min_score']}%")
print(f" Max: {scores['max_score']}%")
print(f" Total Reports: {scores['total_reports']}")
# Top failures
print(f"\nTop 10 Failed Controls:")
failures = self.aggregate_failures_by_control()
for control_id, count in failures[:10]:
print(f" {control_id}: {count} failures")
# Severity distribution
print(f"\nFailures by Severity:")
severity = self.aggregate_failures_by_severity()
for sev, count in severity.items():
print(f" {sev}: {count}")
# Trend
print(f"\nScore Trend (last 5 scans):")
trend = self.generate_trend_data()
for entry in trend[-5:]:
print(f" {entry['timestamp']}: {entry['score']}% ({entry['failed']} failed)")
if __name__ == '__main__':
import sys
reports_dir = sys.argv[1] if len(sys.argv) > 1 else '/var/reports/kafkaguard'
aggregator = ReportAggregator(reports_dir)
aggregator.load_reports()
aggregator.generate_summary()
Usage:
python3 aggregate-reports.py /var/reports/kafkaguard
PowerShell Example (Windows)
# kafkaguard-scan.ps1 - Windows PowerShell automation
param(
[string]$Bootstrap = "kafka.example.com:9092",
[string]$Policy = "policies/enterprise-default.yaml",
[string]$OutDir = "C:\Reports\KafkaGuard"
)
Write-Host "KafkaGuard Automated Scan" -ForegroundColor Green
Write-Host "=========================" -ForegroundColor Green
# Create reports directory
New-Item -ItemType Directory -Force -Path $OutDir | Out-Null
# Run scan
Write-Host "Scanning $Bootstrap..."
& kafkaguard scan `
--bootstrap $Bootstrap `
--policy $Policy `
--format json,html `
--out $OutDir `
--log-level info
# Parse results
$LatestJson = Get-ChildItem -Path $OutDir -Filter "scan-*.json" | Sort-Object LastWriteTime -Descending | Select-Object -First 1
if ($LatestJson) {
$Report = Get-Content $LatestJson.FullName | ConvertFrom-Json
$Score = $Report.summary.score
$Failed = $Report.summary.failed
$FailedHigh = ($Report.findings | Where-Object { $_.status -eq "FAILED" -and $_.severity -eq "HIGH" }).Count
Write-Host ""
Write-Host "Results:" -ForegroundColor Yellow
Write-Host " Score: $Score%"
Write-Host " Failed Controls: $Failed"
Write-Host " HIGH Severity Failures: $FailedHigh"
if ($FailedHigh -gt 0) {
Write-Host " WARNING: HIGH severity failures detected!" -ForegroundColor Red
}
}
else {
Write-Host "ERROR: Scan failed!" -ForegroundColor Red
exit 1
}
Write-Host ""
Write-Host "Scan complete. Reports saved to $OutDir"
Air-Gapped Deployment Best Practices
Bundle Creation and Verification
Create reproducible, versioned bundles:
#!/bin/bash
# create-airgapped-bundle.sh - Create versioned air-gapped bundle
VERSION="1.0.0"
BUILD_DATE=$(date +%Y%m%d)
BUNDLE_NAME="kafkaguard-airgapped-${VERSION}-${BUILD_DATE}"
BUNDLE_DIR="$BUNDLE_NAME"
echo "Creating KafkaGuard air-gapped bundle: $BUNDLE_NAME"
# Create bundle structure
mkdir -p "$BUNDLE_DIR"/{bin,policies,certs,docs,scripts}
# Download KafkaGuard binary
wget -O "$BUNDLE_DIR/bin/kafkaguard" \
https://github.com/aiopsone/kafkaguard-releases/releases/download/v${VERSION}/kafkaguard_Linux_x86_64.tar.gz
chmod +x "$BUNDLE_DIR/bin/kafkaguard"
# Clone repository for policies and docs
git clone --depth 1 --branch v${VERSION} https://github.com/aiopsone/kafkaguard.git /tmp/kafkaguard-repo
cp -r /tmp/kafkaguard-repo/policies "$BUNDLE_DIR/"
cp -r /tmp/kafkaguard-repo/docs "$BUNDLE_DIR/"
cp /tmp/kafkaguard-repo/README.md "$BUNDLE_DIR/"
# Create installation script
cat > "$BUNDLE_DIR/install.sh" <<'EOF'
#!/bin/bash
# install.sh - Install KafkaGuard in air-gapped environment
set -e
echo "Installing KafkaGuard..."
# Install binary
sudo cp bin/kafkaguard /usr/local/bin/
sudo chmod +x /usr/local/bin/kafkaguard
# Install policies
sudo mkdir -p /opt/kafkaguard
sudo cp -r policies /opt/kafkaguard/
sudo cp -r docs /opt/kafkaguard/
# Verify installation
kafkaguard version
echo "✅ KafkaGuard installed successfully"
echo ""
echo "Next steps:"
echo "1. Copy certificates to /opt/kafkaguard/certs/"
echo "2. Run: kafkaguard scan --bootstrap <kafka-broker> --policy /opt/kafkaguard/policies/enterprise-default.yaml"
EOF
chmod +x "$BUNDLE_DIR/install.sh"
# Create manifest
cat > "$BUNDLE_DIR/MANIFEST.txt" <<EOF
KafkaGuard Air-Gapped Bundle
=============================
Version: $VERSION
Build Date: $BUILD_DATE
Platform: linux-amd64
Contents:
- bin/kafkaguard: KafkaGuard binary
- policies/: Policy files (baseline-dev, enterprise-default)
- docs/: Documentation
- install.sh: Installation script
Installation:
1. Extract bundle: tar -xzf ${BUNDLE_NAME}.tar.gz
2. Run: cd ${BUNDLE_NAME} && sudo ./install.sh
3. Verify: kafkaguard version
For full documentation, see docs/user-guide/index.md
EOF
# Create tarball
cd ..
tar -czf "${BUNDLE_NAME}.tar.gz" "$BUNDLE_NAME"
# Generate checksums
sha256sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.sha256"
md5sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.md5"
# Create verification script
cat > "verify-${BUNDLE_NAME}.sh" <<EOF
#!/bin/bash
# Verify bundle integrity
echo "Verifying bundle integrity..."
if sha256sum -c "${BUNDLE_NAME}.tar.gz.sha256"; then
echo "✅ SHA256 checksum valid"
else
echo "❌ SHA256 checksum FAILED"
exit 1
fi
if md5sum -c "${BUNDLE_NAME}.tar.gz.md5"; then
echo "✅ MD5 checksum valid"
else
echo "❌ MD5 checksum FAILED"
exit 1
fi
echo "✅ Bundle integrity verified"
EOF
chmod +x "verify-${BUNDLE_NAME}.sh"
# Clean up
rm -rf /tmp/kafkaguard-repo
echo "✅ Bundle created: ${BUNDLE_NAME}.tar.gz"
echo " Size: $(du -h "${BUNDLE_NAME}.tar.gz" | cut -f1)"
echo " SHA256: $(cat "${BUNDLE_NAME}.tar.gz.sha256" | cut -d' ' -f1)"
echo ""
echo "Verification script: verify-${BUNDLE_NAME}.sh"
Secure Transfer Procedures
Document secure transfer process:
- Generate checksums (SHA256, MD5)
- Encrypt bundle (GPG, AES-256)
- Transfer via approved method (USB, secure file transfer)
- Verify integrity on air-gapped system
- Decrypt and extract
- Validate checksums again
Updating KafkaGuard in Air-Gapped Environments
Establish update procedures:
#!/bin/bash
# update-airgapped.sh - Update KafkaGuard in air-gapped environment
CURRENT_VERSION=$(kafkaguard version --format json | jq -r '.version')
NEW_BUNDLE="kafkaguard-airgapped-1.1.0-20251201.tar.gz"
echo "Current version: $CURRENT_VERSION"
echo "Updating to new bundle: $NEW_BUNDLE"
# Verify bundle
sha256sum -c "$NEW_BUNDLE.sha256" || exit 1
# Extract bundle
tar -xzf "$NEW_BUNDLE"
cd kafkaguard-airgapped-*
# Backup current installation
sudo cp /usr/local/bin/kafkaguard /usr/local/bin/kafkaguard.backup
sudo cp -r /opt/kafkaguard /opt/kafkaguard.backup
# Install new version
sudo ./install.sh
# Verify update
NEW_VERSION=$(kafkaguard version --format json | jq -r '.version')
echo "Updated to version: $NEW_VERSION"
# Test scan
kafkaguard scan \
--bootstrap kafka-internal:9092 \
--policy /opt/kafkaguard/policies/baseline-dev.yaml \
--format json \
--out /tmp/test-reports
if [ $? -eq 0 ] || [ $? -eq 1 ]; then
echo "✅ Update successful and verified"
else
echo "❌ Update verification failed, rolling back..."
sudo cp /usr/local/bin/kafkaguard.backup /usr/local/bin/kafkaguard
exit 1
fi
Policy Updates in Air-Gapped Environments
Manage policy updates separately:
# Update only policies (no binary update)
cd /opt/kafkaguard
sudo cp -r policies policies.backup.$(date +%Y%m%d)
sudo cp -r /path/to/new/policies .
# Validate new policies
for policy in policies/*.yaml; do
kafkaguard validate-policy --policy "$policy"
done
Versioning and Change Management
Maintain change log for air-gapped deployments:
# Air-Gapped Deployment Change Log
## 2025-11-15: v1.0.0 Initial Deployment
- Installed KafkaGuard v1.0.0
- Deployed policies: baseline-dev, enterprise-default
- Configured for kafka-internal:9092 cluster
- First scan: 95% score
## 2025-12-01: Policy Update
- Updated enterprise-default policy
- Added 2 custom controls (KG-101, KG-102)
- Re-scan: 93% score (new controls)
## 2025-12-15: v1.1.0 Upgrade
- Upgraded KafkaGuard to v1.1.0
- New features: improved report generation
- Verified: All scans working
Production Security Considerations
Least Privilege Access
KafkaGuard only needs read-only access to Kafka clusters:
# Required Kafka ACLs for KafkaGuard
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --cluster kafka-cluster
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --topic '*'
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --group '*'
NOT required:
- ❌ WRITE permissions
- ❌ DELETE permissions
- ❌ ALTER permissions
- ❌ CREATE permissions
Credential Rotation Procedures
Implement regular credential rotation:
#!/bin/bash
# rotate-credentials.sh - Rotate KafkaGuard credentials
NEW_PASSWORD=$(openssl rand -base64 32)
# Update password on Kafka broker
kafka-configs.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--alter --add-config "SCRAM-SHA-512=[password=$NEW_PASSWORD]" \
--entity-type users --entity-name kafkaguard
# Update in Vault
vault kv put secret/kafka/prod password="$NEW_PASSWORD"
# Test new credentials
export KAFKAGUARD_SASL_PASSWORD="$NEW_PASSWORD"
kafkaguard scan \
--bootstrap kafka:9095 \
--security-protocol SASL_SSL \
--sasl-mechanism SCRAM-SHA-512 \
--sasl-username kafkaguard \
--policy policies/enterprise-default.yaml \
--format json \
--out /tmp/test-reports
if [ $? -eq 0 ] || [ $? -eq 1 ]; then
echo "✅ Credential rotation successful"
else
echo "❌ Credential rotation failed - reverting"
# Revert to old password
exit 1
fi
Audit Logging
Enable comprehensive audit logging:
#!/bin/bash
# Log all KafkaGuard executions
LOG_DIR="/var/log/kafkaguard"
mkdir -p "$LOG_DIR"
LOG_FILE="$LOG_DIR/audit-$(date +%Y%m%d).log"
# Log execution details
echo "$(date -Iseconds) | User: $(whoami) | Command: kafkaguard scan $@" >> "$LOG_FILE"
# Run scan with logging
kafkaguard scan "$@" 2>&1 | tee -a "$LOG_FILE"
EXIT_CODE=$?
# Log result
echo "$(date -Iseconds) | Exit Code: $EXIT_CODE" >> "$LOG_FILE"
exit $EXIT_CODE
Log Retention:
- Development: 30 days
- Production: 90+ days (PCI-DSS requirement)
- Compliance: Per regulatory requirements (1-7 years)
Network Segmentation
Run KafkaGuard in management/monitoring network:
- ✅ DO: Run KafkaGuard from dedicated management network
- ✅ DO: Restrict network access to Kafka brokers (firewall rules)
- ✅ DO: Use VPN for remote scanning
- ❌ DON'T: Run from untrusted networks
- ❌ DON'T: Allow KafkaGuard to be accessible from internet
Report Access Control
Protect reports containing sensitive cluster information:
# Set restrictive permissions on reports
chmod 600 reports/*.pdf
chmod 600 reports/*.json
# Store reports in access-controlled directory
REPORTS_DIR="/var/reports/kafkaguard"
sudo chown -R kafkaguard:security "$REPORTS_DIR"
sudo chmod 750 "$REPORTS_DIR"
Report Distribution:
- ✅ Use secure file transfer (SFTP, S3 with encryption)
- ✅ Encrypt reports before sharing (GPG, ZIP with password)
- ✅ Track report access (audit logs)
- ❌ Don't email reports unencrypted
- ❌ Don't store reports in public locations
Next Steps
For more information:
- User Guide Index - Return to main user guide
- Usage Patterns - Common workflows
- Troubleshooting - Resolve issues
- CLI Reference - Complete command reference
Document Information
- Last Updated: 2025-11-15
- Applies to Version: KafkaGuard 1.0.0+
- Feedback: Open an issue for improvements