Appearance
Advanced Usage Guide
Advanced topics, integrations, and best practices for power users and production deployments.
Table of Contents
- Custom Policy Creation
- Performance Tuning
- Monitoring System Integration
- Automation with Scripting
- Production Security Considerations
- Next Steps
Custom Policy Creation
While KafkaGuard provides three built-in policy tiers (baseline-dev, enterprise-default, and finance-iso in Phase 2), you can create custom policies tailored to your organization's specific requirements.
When to Create Custom Policies
Create custom policies when you need to:
- ✅ Enforce organization-specific compliance requirements
- ✅ Combine controls from multiple tiers
- ✅ Add custom controls for internal standards
- ✅ Adjust control severity levels for your environment
- ✅ Create environment-specific policies (dev, staging, prod)
Quick Policy Structure Reference
yaml
version: "1.0"
name: "My Custom Policy"
description: "Custom policy for our environment"
tier: "custom"
controls:
- id: KG-XXX
title: "Control title"
description: "What this control checks"
severity: HIGH | MEDIUM | LOW
category: security | reliability | operational | compliance
expr: |
# CEL expression that returns true (pass) or false (fail)
broker.config["auto.create.topics.enable"] == "false"
remediation: |
Step-by-step fix instructions
compliance:
pci_dss: ["4.1", "8.2"]
soc2: ["CC6.1"]
iso27001: ["A.12.6.1"]Policy Validation
Always validate custom policies before deployment:
bash
# Validate policy syntax
kafkaguard validate-policy --policy policies/custom-policy.yaml
# Test policy against dev cluster first
kafkaguard scan \
--bootstrap kafka-dev:9092 \
--policy policies/custom-policy.yaml \
--format json \
--out ./test-reports
# Review results and iterate
cat test-reports/scan-*.json | jq '.findings[] | select(.status == "FAILED")'For comprehensive documentation on creating custom policies, see the Policy Creation Guide in the repository.
Performance Tuning
Timeout Configuration
Adjust timeout based on cluster size and network conditions:
bash
# Default timeout: 300 seconds (5 minutes)
kafkaguard scan --bootstrap kafka:9092
# Small clusters (<100 topics): 300 seconds
kafkaguard scan --bootstrap kafka:9092 --timeout 300
# Medium clusters (100-500 topics): 600 seconds (10 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 600
# Large clusters (500-1000 topics): 900 seconds (15 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 900
# Very large clusters (1000+ topics): 1800 seconds (30 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 1800Parallel Collection Settings
KafkaGuard uses parallel collection by default for faster scans:
bash
# Default: parallel enabled with 6 max collectors
kafkaguard scan --bootstrap kafka:9092
# Increase max collectors for large clusters (10+ brokers)
kafkaguard scan \
--bootstrap kafka:9092 \
--parallel true \
--max-collectors 10
# Reduce max collectors if experiencing memory issues
kafkaguard scan \
--bootstrap kafka:9092 \
--max-collectors 3
# Disable parallel collection (not recommended, slower)
kafkaguard scan \
--bootstrap kafka:9092 \
--parallel falseExpected Memory Usage
| Cluster Size | Expected Memory |
|---|---|
| 3 brokers, <100 topics | 50-100 MB |
| 5 brokers, 100-500 topics | 100-150 MB |
| 10 brokers, 500-1000 topics | 150-200 MB |
| 20+ brokers, 1000+ topics | 200-300 MB |
Monitoring System Integration
Splunk Integration
Parse JSON reports and forward to Splunk for analysis:
bash
#!/bin/bash
# splunk-forward.sh - Forward KafkaGuard reports to Splunk
REPORTS_DIR="/var/reports/kafkaguard"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token"
# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
# Extract findings and forward to Splunk
cat "$LATEST_JSON" | jq -c '.findings[]' | while read -r finding; do
curl -k "$SPLUNK_HEC_URL" \
-H "Authorization: Splunk $SPLUNK_HEC_TOKEN" \
-d "{\"event\": $finding, \"sourcetype\": \"kafkaguard:scan\"}"
done
echo "Forwarded $(jq '.findings | length' "$LATEST_JSON") findings to Splunk"ELK Stack Integration
Index KafkaGuard reports in Elasticsearch for analysis and visualization:
bash
#!/bin/bash
# elk-index.sh - Index KafkaGuard reports in Elasticsearch
REPORTS_DIR="/var/reports/kafkaguard"
ELASTICSEARCH_URL="http://elasticsearch:9200"
INDEX_NAME="kafkaguard-findings"
# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
# Extract metadata and findings
METADATA=$(cat "$LATEST_JSON" | jq -c '.metadata')
FINDINGS=$(cat "$LATEST_JSON" | jq -c '.findings[]')
# Index each finding
echo "$FINDINGS" | while read -r finding; do
# Combine metadata with finding
DOCUMENT=$(jq -n \
--argjson metadata "$METADATA" \
--argjson finding "$finding" \
'$metadata + $finding')
# Index in Elasticsearch
curl -X POST "$ELASTICSEARCH_URL/$INDEX_NAME/_doc" \
-H 'Content-Type: application/json' \
-d "$DOCUMENT"
done
echo "Indexed $(jq '.findings | length' "$LATEST_JSON") findings in Elasticsearch"Automation with Scripting
Bash Script Example
Multi-cluster scanning with email notifications:
bash
#!/bin/bash
# automated-compliance-scan.sh - Automated multi-cluster scanning with alerts
set -e
# Configuration
CLUSTERS=("kafka-dev:9092" "kafka-staging:9095" "kafka-prod:9095")
POLICIES=("baseline-dev.yaml" "enterprise-default.yaml" "enterprise-default.yaml")
REPORTS_BASE="/var/reports/kafkaguard"
ALERT_EMAIL="security@example.com"
# Timestamp for this scan run
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
SUMMARY_FILE="/tmp/scan-summary-$TIMESTAMP.txt"
echo "KafkaGuard Automated Compliance Scan - $TIMESTAMP" > "$SUMMARY_FILE"
echo "=================================================" >> "$SUMMARY_FILE"
echo "" >> "$SUMMARY_FILE"
# Scan each cluster
for i in "${!CLUSTERS[@]}"; do
CLUSTER="${CLUSTERS[$i]}"
POLICY="${POLICIES[$i]}"
CLUSTER_NAME=$(echo "$CLUSTER" | cut -d: -f1)
echo "Scanning $CLUSTER_NAME..." | tee -a "$SUMMARY_FILE"
REPORT_DIR="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
mkdir -p "$REPORT_DIR"
# Run scan
kafkaguard scan \
--bootstrap "$CLUSTER" \
--policy "policies/$POLICY" \
--format json,html \
--out "$REPORT_DIR" \
--log-level info || true
# Parse results
LATEST_JSON=$(ls -t "$REPORT_DIR"/scan-*.json | head -1)
if [ -f "$LATEST_JSON" ]; then
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')
echo " Score: $SCORE%" | tee -a "$SUMMARY_FILE"
echo " Failed Controls: $FAILED" | tee -a "$SUMMARY_FILE"
echo " HIGH Severity Failures: $FAILED_HIGH" | tee -a "$SUMMARY_FILE"
if [ "$FAILED_HIGH" -gt 0 ]; then
echo " ⚠️ ALERT: HIGH severity failures detected!" | tee -a "$SUMMARY_FILE"
fi
else
echo " ❌ Scan failed!" | tee -a "$SUMMARY_FILE"
fi
echo "" | tee -a "$SUMMARY_FILE"
done
# Send email summary
mail -s "KafkaGuard Compliance Scan Summary - $TIMESTAMP" "$ALERT_EMAIL" < "$SUMMARY_FILE"
echo "Scan complete. Summary sent to $ALERT_EMAIL"Production Security Considerations
Least Privilege Access
KafkaGuard only needs read-only access to Kafka clusters:
bash
# Required Kafka ACLs for KafkaGuard
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --cluster kafka-cluster
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --topic '*'
kafka-acls.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--add --allow-principal User:kafkaguard \
--operation Describe --group '*'NOT required:
- ❌ WRITE permissions
- ❌ DELETE permissions
- ❌ ALTER permissions
- ❌ CREATE permissions
Credential Rotation Procedures
Implement regular credential rotation:
bash
#!/bin/bash
# rotate-credentials.sh - Rotate KafkaGuard credentials
NEW_PASSWORD=$(openssl rand -base64 32)
# Update password on Kafka broker
kafka-configs.sh --bootstrap-server kafka:9095 \
--command-config admin.properties \
--alter --add-config "SCRAM-SHA-512=[password=$NEW_PASSWORD]" \
--entity-type users --entity-name kafkaguard
# Update in Vault
vault kv put secret/kafka/prod password="$NEW_PASSWORD"
# Test new credentials
export KAFKAGUARD_SASL_PASSWORD="$NEW_PASSWORD"
kafkaguard scan \
--bootstrap kafka:9095 \
--security-protocol SASL_SSL \
--sasl-mechanism SCRAM-SHA-512 \
--sasl-username kafkaguard \
--policy policies/enterprise-default.yaml \
--format json \
--out /tmp/test-reports
if [ $? -eq 0 ] || [ $? -eq 1 ]; then
echo "✅ Credential rotation successful"
else
echo "❌ Credential rotation failed - reverting"
exit 1
fiReport Access Control
Protect reports containing sensitive cluster information:
bash
# Set restrictive permissions on reports
chmod 600 reports/*.pdf
chmod 600 reports/*.json
# Store reports in access-controlled directory
REPORTS_DIR="/var/reports/kafkaguard"
sudo chown -R kafkaguard:security "$REPORTS_DIR"
sudo chmod 750 "$REPORTS_DIR"Next Steps
- Policy Tiers - Learn about policy tiers and custom policy creation
- Configuration Guide - Configure KafkaGuard for production
- Scanning Guide - Learn advanced scanning techniques
- Reports Guide - Understand report formats and automation
For more advanced topics, see the Advanced Usage Guide in the repository.
Need Help?
- Policy Tiers - Learn about custom policies
- Configuration Guide - Configure for production
- GitHub Issues - Report bugs or request features
Ready to Get Started?
Book a demo to see KafkaGuard in action and learn how it can help secure your Kafka clusters.
📅 Book a Demo