Skip to content

Advanced Usage Guide

Advanced topics, integrations, and best practices for power users and production deployments.

Table of Contents


Custom Policy Creation

While KafkaGuard provides three built-in policy tiers (baseline-dev, enterprise-default, and finance-iso in Phase 2), you can create custom policies tailored to your organization's specific requirements.

When to Create Custom Policies

Create custom policies when you need to:

  • ✅ Enforce organization-specific compliance requirements
  • ✅ Combine controls from multiple tiers
  • ✅ Add custom controls for internal standards
  • ✅ Adjust control severity levels for your environment
  • ✅ Create environment-specific policies (dev, staging, prod)

Quick Policy Structure Reference

yaml
version: "1.0"
name: "My Custom Policy"
description: "Custom policy for our environment"
tier: "custom"

controls:
  - id: KG-XXX
    title: "Control title"
    description: "What this control checks"
    severity: HIGH | MEDIUM | LOW
    category: security | reliability | operational | compliance
    expr: |
      # CEL expression that returns true (pass) or false (fail)
      broker.config["auto.create.topics.enable"] == "false"
    remediation: |
      Step-by-step fix instructions
    compliance:
      pci_dss: ["4.1", "8.2"]
      soc2: ["CC6.1"]
      iso27001: ["A.12.6.1"]

Policy Validation

Always validate custom policies before deployment:

bash
# Validate policy syntax
kafkaguard validate-policy --policy policies/custom-policy.yaml

# Test policy against dev cluster first
kafkaguard scan \
  --bootstrap kafka-dev:9092 \
  --policy policies/custom-policy.yaml \
  --format json \
  --out ./test-reports

# Review results and iterate
cat test-reports/scan-*.json | jq '.findings[] | select(.status == "FAILED")'

For comprehensive documentation on creating custom policies, see the Policy Creation Guide in the repository.


Performance Tuning

Timeout Configuration

Adjust timeout based on cluster size and network conditions:

bash
# Default timeout: 300 seconds (5 minutes)
kafkaguard scan --bootstrap kafka:9092

# Small clusters (<100 topics): 300 seconds
kafkaguard scan --bootstrap kafka:9092 --timeout 300

# Medium clusters (100-500 topics): 600 seconds (10 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 600

# Large clusters (500-1000 topics): 900 seconds (15 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 900

# Very large clusters (1000+ topics): 1800 seconds (30 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 1800

Parallel Collection Settings

KafkaGuard uses parallel collection by default for faster scans:

bash
# Default: parallel enabled with 6 max collectors
kafkaguard scan --bootstrap kafka:9092

# Increase max collectors for large clusters (10+ brokers)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel true \
  --max-collectors 10

# Reduce max collectors if experiencing memory issues
kafkaguard scan \
  --bootstrap kafka:9092 \
  --max-collectors 3

# Disable parallel collection (not recommended, slower)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel false

Expected Memory Usage

Cluster SizeExpected Memory
3 brokers, <100 topics50-100 MB
5 brokers, 100-500 topics100-150 MB
10 brokers, 500-1000 topics150-200 MB
20+ brokers, 1000+ topics200-300 MB

Monitoring System Integration

Splunk Integration

Parse JSON reports and forward to Splunk for analysis:

bash
#!/bin/bash
# splunk-forward.sh - Forward KafkaGuard reports to Splunk

REPORTS_DIR="/var/reports/kafkaguard"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract findings and forward to Splunk
cat "$LATEST_JSON" | jq -c '.findings[]' | while read -r finding; do
  curl -k "$SPLUNK_HEC_URL" \
    -H "Authorization: Splunk $SPLUNK_HEC_TOKEN" \
    -d "{\"event\": $finding, \"sourcetype\": \"kafkaguard:scan\"}"
done

echo "Forwarded $(jq '.findings | length' "$LATEST_JSON") findings to Splunk"

ELK Stack Integration

Index KafkaGuard reports in Elasticsearch for analysis and visualization:

bash
#!/bin/bash
# elk-index.sh - Index KafkaGuard reports in Elasticsearch

REPORTS_DIR="/var/reports/kafkaguard"
ELASTICSEARCH_URL="http://elasticsearch:9200"
INDEX_NAME="kafkaguard-findings"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract metadata and findings
METADATA=$(cat "$LATEST_JSON" | jq -c '.metadata')
FINDINGS=$(cat "$LATEST_JSON" | jq -c '.findings[]')

# Index each finding
echo "$FINDINGS" | while read -r finding; do
  # Combine metadata with finding
  DOCUMENT=$(jq -n \
    --argjson metadata "$METADATA" \
    --argjson finding "$finding" \
    '$metadata + $finding')

  # Index in Elasticsearch
  curl -X POST "$ELASTICSEARCH_URL/$INDEX_NAME/_doc" \
    -H 'Content-Type: application/json' \
    -d "$DOCUMENT"
done

echo "Indexed $(jq '.findings | length' "$LATEST_JSON") findings in Elasticsearch"

Automation with Scripting

Bash Script Example

Multi-cluster scanning with email notifications:

bash
#!/bin/bash
# automated-compliance-scan.sh - Automated multi-cluster scanning with alerts

set -e

# Configuration
CLUSTERS=("kafka-dev:9092" "kafka-staging:9095" "kafka-prod:9095")
POLICIES=("baseline-dev.yaml" "enterprise-default.yaml" "enterprise-default.yaml")
REPORTS_BASE="/var/reports/kafkaguard"
ALERT_EMAIL="security@example.com"

# Timestamp for this scan run
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
SUMMARY_FILE="/tmp/scan-summary-$TIMESTAMP.txt"

echo "KafkaGuard Automated Compliance Scan - $TIMESTAMP" > "$SUMMARY_FILE"
echo "=================================================" >> "$SUMMARY_FILE"
echo "" >> "$SUMMARY_FILE"

# Scan each cluster
for i in "${!CLUSTERS[@]}"; do
  CLUSTER="${CLUSTERS[$i]}"
  POLICY="${POLICIES[$i]}"
  CLUSTER_NAME=$(echo "$CLUSTER" | cut -d: -f1)

  echo "Scanning $CLUSTER_NAME..." | tee -a "$SUMMARY_FILE"

  REPORT_DIR="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
  mkdir -p "$REPORT_DIR"

  # Run scan
  kafkaguard scan \
    --bootstrap "$CLUSTER" \
    --policy "policies/$POLICY" \
    --format json,html \
    --out "$REPORT_DIR" \
    --log-level info || true

  # Parse results
  LATEST_JSON=$(ls -t "$REPORT_DIR"/scan-*.json | head -1)

  if [ -f "$LATEST_JSON" ]; then
    SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
    FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
    FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

    echo "  Score: $SCORE%" | tee -a "$SUMMARY_FILE"
    echo "  Failed Controls: $FAILED" | tee -a "$SUMMARY_FILE"
    echo "  HIGH Severity Failures: $FAILED_HIGH" | tee -a "$SUMMARY_FILE"

    if [ "$FAILED_HIGH" -gt 0 ]; then
      echo "  ⚠️  ALERT: HIGH severity failures detected!" | tee -a "$SUMMARY_FILE"
    fi
  else
    echo "  ❌ Scan failed!" | tee -a "$SUMMARY_FILE"
  fi

  echo "" | tee -a "$SUMMARY_FILE"
done

# Send email summary
mail -s "KafkaGuard Compliance Scan Summary - $TIMESTAMP" "$ALERT_EMAIL" < "$SUMMARY_FILE"

echo "Scan complete. Summary sent to $ALERT_EMAIL"

Production Security Considerations

Least Privilege Access

KafkaGuard only needs read-only access to Kafka clusters:

bash
# Required Kafka ACLs for KafkaGuard
kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --cluster kafka-cluster

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --topic '*'

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --group '*'

NOT required:

  • ❌ WRITE permissions
  • ❌ DELETE permissions
  • ❌ ALTER permissions
  • ❌ CREATE permissions

Credential Rotation Procedures

Implement regular credential rotation:

bash
#!/bin/bash
# rotate-credentials.sh - Rotate KafkaGuard credentials

NEW_PASSWORD=$(openssl rand -base64 32)

# Update password on Kafka broker
kafka-configs.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --alter --add-config "SCRAM-SHA-512=[password=$NEW_PASSWORD]" \
  --entity-type users --entity-name kafkaguard

# Update in Vault
vault kv put secret/kafka/prod password="$NEW_PASSWORD"

# Test new credentials
export KAFKAGUARD_SASL_PASSWORD="$NEW_PASSWORD"
kafkaguard scan \
  --bootstrap kafka:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --sasl-username kafkaguard \
  --policy policies/enterprise-default.yaml \
  --format json \
  --out /tmp/test-reports

if [ $? -eq 0 ] || [ $? -eq 1 ]; then
    echo "✅ Credential rotation successful"
else
    echo "❌ Credential rotation failed - reverting"
    exit 1
fi

Report Access Control

Protect reports containing sensitive cluster information:

bash
# Set restrictive permissions on reports
chmod 600 reports/*.pdf
chmod 600 reports/*.json

# Store reports in access-controlled directory
REPORTS_DIR="/var/reports/kafkaguard"
sudo chown -R kafkaguard:security "$REPORTS_DIR"
sudo chmod 750 "$REPORTS_DIR"

Next Steps

For more advanced topics, see the Advanced Usage Guide in the repository.


Need Help?

Ready to Get Started?

Book a demo to see KafkaGuard in action and learn how it can help secure your Kafka clusters.

📅 Book a Demo