Usage Patterns and Workflows

Common workflows and usage scenarios for KafkaGuard across different environments and use cases.

Table of Contents


Overview

This guide provides real-world usage patterns for common KafkaGuard scenarios. Each workflow includes:

  • Scenario description - What you're trying to accomplish
  • Policy selection - Which policy tier to use
  • Security configuration - Authentication and encryption setup
  • Command examples - Ready-to-use commands
  • Expected output - What success looks like
  • Success criteria - How to validate results

Dev Cluster Scanning Workflow

Scenario

Development team validating local or sandbox Kafka cluster health before deploying to higher environments.

Use Case Details

  • Environment: Development, sandbox, or local testing
  • Security: PLAINTEXT (no authentication) or basic SSL
  • Policy: baseline-dev.yaml (20 controls - reliability + operational)
  • Frequency: On-demand or pre-merge in development
  • Report Format: JSON + HTML for quick review
  • Fail Threshold: Medium (allow some issues in dev)

Configuration

Policy Selection

# baseline-dev.yaml includes:
# - 8 Reliability controls (replication, ISR, ZooKeeper)
# - 12 Operational controls (config, retention, threading)
# - NO security controls (assumes network-level security)

Security Setup

# Development clusters typically use PLAINTEXT
export KAFKAGUARD_SECURITY_PROTOCOL="PLAINTEXT"

# Or basic SSL for encrypted dev clusters
export KAFKAGUARD_SECURITY_PROTOCOL="SSL"
export KAFKAGUARD_TLS_CA_CERT="/path/to/dev-ca.pem"

Step-by-Step Workflow

Step 1: Verify Kafka Cluster is Running

# Test connectivity
telnet localhost 9092

# Or check with kafka-broker-api-versions
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Step 2: Run Development Scan

kafkaguard scan \
  --bootstrap localhost:9092 \
  --policy policies/baseline-dev.yaml \
  --format html,json \
  --out ./reports \
  --fail-on medium \
  --log-level info

Step 3: Review Results

# Open HTML report in browser
open reports/scan-*.html  # macOS
xdg-open reports/scan-*.html  # Linux

# Or review JSON report
cat reports/scan-*.json | jq '.summary'

Step 4: Address Findings (if any)

// Example JSON summary
{
  "score": 85,
  "total_controls": 20,
  "passed": 17,
  "failed": 2,
  "not_applicable": 1,
  "scan_duration_seconds": 8.3
}

Common dev cluster issues:

  • Under-replicated partitions (temporary during development)
  • Inconsistent broker versions (mixed versions during testing)
  • Auto-create topics enabled (convenience in dev)

Success Criteria

Scan completes successfully (exit code 0 or 1) ✅ No HIGH severity failures (baseline-dev has no HIGH controls) ✅ Reliability controls pass (replication ≥3, ISR ≥2) ✅ Report generated in <10 seconds

Example Output

🚀 KafkaGuard v1.0.0 - Starting scan...
✅ Connected to Kafka cluster (3 brokers detected)
🔍 Collecting broker configurations...
🔍 Collecting topic metadata...
📊 Evaluating 20 controls from baseline-dev policy...

Results:
  ✅ Passed: 17 controls
  ❌ Failed: 2 controls
  ⚠️  N/A: 1 control

Overall Score: 85%

Failed Controls:
  [MEDIUM] KG-028: Auto-create topics disabled
    Evidence: auto.create.topics.enable=true on broker1
    Remediation: Set auto.create.topics.enable=false in server.properties

  [MEDIUM] KG-031: Compression type configured
    Evidence: compression.type=none on 5 topics
    Remediation: Set compression.type=lz4 or snappy

📁 Reports generated:
  - ./reports/scan-20251115120530-abc123.html
  - ./reports/scan-20251115120530-abc123.json

✨ Scan completed in 8.3 seconds

Automation Script for Dev Workflow

#!/bin/bash
# dev-scan.sh - Development cluster scan workflow

set -e

echo "🔍 KafkaGuard Development Scan"
echo "=============================="

# Configuration
KAFKA_BROKER="${KAFKA_BROKER:-localhost:9092}"
POLICY="policies/baseline-dev.yaml"
REPORTS_DIR="./reports"

# Create reports directory
mkdir -p "$REPORTS_DIR"

# Run scan
echo "Scanning $KAFKA_BROKER..."
kafkaguard scan \
  --bootstrap "$KAFKA_BROKER" \
  --policy "$POLICY" \
  --format html,json \
  --out "$REPORTS_DIR" \
  --fail-on medium \
  --log-level info

EXIT_CODE=$?

# Report results
if [ $EXIT_CODE -eq 0 ]; then
  echo "✅ Scan passed - no issues detected"
elif [ $EXIT_CODE -eq 1 ]; then
  echo "⚠️  Scan completed with findings - review report"
  # Parse findings
  LATEST_REPORT=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
  echo ""
  echo "Failed Controls:"
  cat "$LATEST_REPORT" | jq -r '.findings[] | select(.status == "FAILED") | "  - [\(.severity)] \(.control_id): \(.title)"'
else
  echo "❌ Scan failed with errors"
  exit 2
fi

# Open report
LATEST_HTML=$(ls -t "$REPORTS_DIR"/scan-*.html | head -1)
echo ""
echo "Opening report: $LATEST_HTML"
if [[ "$OSTYPE" == "darwin"* ]]; then
  open "$LATEST_HTML"
elif [[ "$OSTYPE" == "linux-gnu"* ]]; then
  xdg-open "$LATEST_HTML" 2>/dev/null || echo "View report at: $LATEST_HTML"
fi

exit $EXIT_CODE

Production Cluster Scanning Workflow

Scenario

Security team performing weekly or on-demand compliance scans of production Kafka clusters.

Use Case Details

  • Environment: Production
  • Security: SASL_SSL with SCRAM-SHA-512
  • Policy: enterprise-default.yaml (40 controls - security + reliability + operational)
  • Frequency: Weekly scheduled scans + on-demand for audits
  • Report Format: JSON + HTML + PDF (for audit trail)
  • Fail Threshold: High (strict compliance requirements)

Configuration

Policy Selection

# enterprise-default.yaml includes:
# - 15 Security controls (SASL, SSL/TLS, ACLs, encryption)
# - 12 Reliability controls (replication, ISR, fault tolerance)
# - 13 Operational controls (config, performance, monitoring)

Security Setup

# Production requires SASL_SSL
export KAFKAGUARD_SECURITY_PROTOCOL="SASL_SSL"
export KAFKAGUARD_SASL_MECHANISM="SCRAM-SHA-512"
export KAFKAGUARD_SASL_USERNAME="kafkaguard"
export KAFKAGUARD_TLS_CA_CERT="/etc/kafkaguard/certs/ca.pem"

# Fetch password from secret manager (recommended)
export KAFKAGUARD_SASL_PASSWORD=$(vault kv get -field=password secret/kafka/prod)

Step-by-Step Workflow

Step 1: Prepare Credentials and Certificates

# Verify certificate exists and is valid
openssl x509 -in /etc/kafkaguard/certs/ca.pem -noout -dates

# Fetch credentials from vault
export KAFKAGUARD_SASL_USERNAME="kafkaguard"
export KAFKAGUARD_SASL_PASSWORD=$(vault kv get -field=password secret/kafka/prod)

# Verify credentials are set
echo "Username: $KAFKAGUARD_SASL_USERNAME"
echo "Password length: ${#KAFKAGUARD_SASL_PASSWORD}"

Step 2: Run Production Scan

kafkaguard scan \
  --bootstrap kafka1.prod:9095,kafka2.prod:9095,kafka3.prod:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --tls-ca-cert /etc/kafkaguard/certs/ca.pem \
  --policy policies/enterprise-default.yaml \
  --format json,html,pdf \
  --out /var/reports/kafkaguard \
  --fail-on high \
  --timeout 600 \
  --log-level info

Step 3: Review Report and Findings

# View summary
LATEST_REPORT=$(ls -t /var/reports/kafkaguard/scan-*.json | head -1)
cat "$LATEST_REPORT" | jq '.summary'

# List failed controls
cat "$LATEST_REPORT" | jq -r '.findings[] | select(.status == "FAILED") | "[\(.severity)] \(.control_id): \(.title)"'

# Open HTML report
xdg-open /var/reports/kafkaguard/scan-*.html

Step 4: Generate Remediation Plan

# Extract HIGH severity findings for immediate action
cat "$LATEST_REPORT" | jq '.findings[] | select(.status == "FAILED" and .severity == "HIGH")' > high-severity-findings.json

# Create remediation tickets (example with jq)
cat high-severity-findings.json | jq -r '
  "Control: \(.control_id) - \(.title)\n" +
  "Severity: \(.severity)\n" +
  "Evidence: \(.evidence)\n" +
  "Remediation: \(.remediation)\n" +
  "---"
'

Step 5: Archive Reports

# Archive reports with timestamp
ARCHIVE_DIR="/var/reports/kafkaguard/archive/$(date +%Y-%m)"
mkdir -p "$ARCHIVE_DIR"
cp /var/reports/kafkaguard/scan-*.{json,html,pdf} "$ARCHIVE_DIR/"

# Compress old reports (older than 90 days)
find /var/reports/kafkaguard/archive -name "*.pdf" -mtime +90 -exec gzip {} \;

Success Criteria

Scan completes successfully (exit code 0 or 1, NOT 2) ✅ All HIGH severity controls passAll security controls pass (SASL, SSL, ACLs) ✅ TLS certificates valid for >30 days (KG-005) ✅ Reports generated (JSON + HTML + PDF)Scan duration <600 seconds

Example Production Report Summary

{
  "metadata": {
    "scan_id": "def456ghi789",
    "timestamp": "2025-11-15T14:30:00Z",
    "cluster_id": "kafka-prod-01",
    "policy": "enterprise-default",
    "policy_version": "1.0"
  },
  "summary": {
    "score": 95,
    "total_controls": 40,
    "passed": 38,
    "failed": 2,
    "not_applicable": 0,
    "severity_breakdown": {
      "high": {"total": 18, "passed": 18, "failed": 0},
      "medium": {"total": 12, "passed": 11, "failed": 1},
      "low": {"total": 10, "passed": 9, "failed": 1}
    }
  }
}

Production Scan Script

#!/bin/bash
# prod-scan.sh - Production cluster compliance scan

set -e

echo "🔒 KafkaGuard Production Compliance Scan"
echo "========================================"

# Configuration
KAFKA_BROKERS="kafka1.prod:9095,kafka2.prod:9095,kafka3.prod:9095"
POLICY="policies/enterprise-default.yaml"
REPORTS_DIR="/var/reports/kafkaguard"
CA_CERT="/etc/kafkaguard/certs/ca.pem"

# Create reports directory
mkdir -p "$REPORTS_DIR"

# Fetch credentials from Vault
echo "Fetching credentials from Vault..."
export KAFKAGUARD_SASL_USERNAME="kafkaguard"
export KAFKAGUARD_SASL_PASSWORD=$(vault kv get -field=password secret/kafka/prod)

# Verify certificate
echo "Verifying TLS certificate..."
CERT_EXPIRY=$(openssl x509 -in "$CA_CERT" -noout -enddate | cut -d= -f2)
echo "Certificate expires: $CERT_EXPIRY"

# Run scan
echo "Scanning production cluster..."
kafkaguard scan \
  --bootstrap "$KAFKA_BROKERS" \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --tls-ca-cert "$CA_CERT" \
  --policy "$POLICY" \
  --format json,html,pdf \
  --out "$REPORTS_DIR" \
  --fail-on high \
  --timeout 600 \
  --log-level info

EXIT_CODE=$?

# Process results
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

echo ""
echo "Scan Results:"
echo "  Overall Score: $SCORE%"
echo "  HIGH severity failures: $FAILED_HIGH"

if [ $EXIT_CODE -eq 0 ]; then
  echo "✅ Compliance scan PASSED"
elif [ $EXIT_CODE -eq 1 ]; then
  echo "⚠️  Compliance scan completed with findings"

  if [ "$FAILED_HIGH" -gt 0 ]; then
    echo "❌ CRITICAL: HIGH severity failures detected!"
    echo ""
    echo "Failed HIGH severity controls:"
    cat "$LATEST_JSON" | jq -r '.findings[] | select(.status == "FAILED" and .severity == "HIGH") | "  - \(.control_id): \(.title)"'

    # Send alert
    echo "Sending alert to security team..."
    # Integrate with notification system (Slack, email, PagerDuty)
  fi
else
  echo "❌ Scan failed with errors"
  exit 2
fi

# Archive reports
ARCHIVE_DIR="$REPORTS_DIR/archive/$(date +%Y-%m)"
mkdir -p "$ARCHIVE_DIR"
cp "$LATEST_JSON" "$ARCHIVE_DIR/"
cp "$(ls -t "$REPORTS_DIR"/scan-*.pdf | head -1)" "$ARCHIVE_DIR/"

echo ""
echo "Reports archived to: $ARCHIVE_DIR"

# Clean old reports (retain 90 days)
find "$REPORTS_DIR/archive" -name "scan-*.json" -mtime +90 -delete
find "$REPORTS_DIR/archive" -name "scan-*.pdf" -mtime +90 -delete

exit $EXIT_CODE

Compliance Audit Workflow

Scenario

Compliance auditor generating quarterly audit report for regulatory compliance (PCI-DSS, SOC2, ISO 27001).

Use Case Details

  • Environment: Production
  • Security: SASL_SSL + mTLS (highest security)
  • Policy: Custom audit policy or enterprise-default.yaml
  • Frequency: Quarterly or as required by auditors
  • Report Format: PDF (for audit submission) + CSV (for analysis)
  • Fail Threshold: None (document all findings, even if passing)

Configuration

Security Setup (mTLS + SASL)

# Highest security for audit access
export KAFKAGUARD_SECURITY_PROTOCOL="SASL_SSL"
export KAFKAGUARD_SASL_MECHANISM="SCRAM-SHA-512"
export KAFKAGUARD_SASL_USERNAME="auditor"
export KAFKAGUARD_SASL_PASSWORD=$(vault kv get -field=password secret/kafka/auditor)
export KAFKAGUARD_TLS_CA_CERT="/etc/kafkaguard/certs/ca.pem"
export KAFKAGUARD_TLS_CLIENT_CERT="/etc/kafkaguard/certs/auditor-cert.pem"
export KAFKAGUARD_TLS_CLIENT_KEY="/etc/kafkaguard/certs/auditor-key.pem"

Step-by-Step Audit Workflow

Step 1: Prepare Audit Environment

# Create audit directory with timestamp
AUDIT_DATE=$(date +%Y-Q%q)  # e.g., 2025-Q4
AUDIT_DIR="/var/audits/kafka/$AUDIT_DATE"
mkdir -p "$AUDIT_DIR"

# Document audit metadata
cat > "$AUDIT_DIR/audit-metadata.txt" <<EOF
Kafka Compliance Audit - $AUDIT_DATE
=====================================
Audit Date: $(date)
Auditor: $(whoami)
Cluster: kafka-prod-01
Policy: enterprise-default.yaml
Compliance Frameworks: PCI-DSS, SOC2, ISO 27001
Tool: KafkaGuard v$(kafkaguard version --format json | jq -r '.version')
EOF

Step 2: Run Comprehensive Audit Scan

kafkaguard scan \
  --bootstrap kafka1.prod:9095,kafka2.prod:9095,kafka3.prod:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --tls-ca-cert /etc/kafkaguard/certs/ca.pem \
  --tls-client-cert /etc/kafkaguard/certs/auditor-cert.pem \
  --tls-client-key /etc/kafkaguard/certs/auditor-key.pem \
  --policy policies/enterprise-default.yaml \
  --format json,pdf,csv \
  --out "$AUDIT_DIR" \
  --fail-on none \
  --timeout 900 \
  --log-level info

Step 3: Generate Compliance Matrix

# Extract compliance mapping from report
LATEST_JSON=$(ls -t "$AUDIT_DIR"/scan-*.json | head -1)

# Generate PCI-DSS compliance matrix
cat "$LATEST_JSON" | jq -r '
  .findings[] |
  select(.compliance.pci_dss | length > 0) |
  "\(.control_id),\(.title),\(.status),\(.compliance.pci_dss | join(";"))"
' > "$AUDIT_DIR/pci-dss-compliance-matrix.csv"

# Generate SOC2 compliance matrix
cat "$LATEST_JSON" | jq -r '
  .findings[] |
  select(.compliance.soc2 | length > 0) |
  "\(.control_id),\(.title),\(.status),\(.compliance.soc2 | join(";"))"
' > "$AUDIT_DIR/soc2-compliance-matrix.csv"

# Generate ISO 27001 compliance matrix
cat "$LATEST_JSON" | jq -r '
  .findings[] |
  select(.compliance.iso27001 | length > 0) |
  "\(.control_id),\(.title),\(.status),\(.compliance.iso27001 | join(";"))"
' > "$AUDIT_DIR/iso27001-compliance-matrix.csv"

Step 4: Document Exceptions and Remediation

# Extract failed controls for remediation tracking
cat "$LATEST_JSON" | jq '.findings[] | select(.status == "FAILED")' > "$AUDIT_DIR/failed-controls.json"

# Create remediation tracker
cat "$AUDIT_DIR/failed-controls.json" | jq -r '
  "Control ID: \(.control_id)\n" +
  "Title: \(.title)\n" +
  "Severity: \(.severity)\n" +
  "Category: \(.category)\n" +
  "Evidence: \(.evidence)\n" +
  "Remediation: \(.remediation)\n" +
  "Compliance Impact: PCI-DSS \(.compliance.pci_dss | join(", ")), SOC2 \(.compliance.soc2 | join(", "))\n" +
  "Status: [ ] Acknowledged  [ ] Remediated  [ ] Accepted Risk\n" +
  "Target Date: _________\n" +
  "Owner: _________\n" +
  "---\n"
' > "$AUDIT_DIR/remediation-tracker.txt"

Step 5: Package Audit Report

# Create audit package
cd "$AUDIT_DIR/.."
tar -czf "kafka-audit-$AUDIT_DATE.tar.gz" "$AUDIT_DATE/"

# Generate checksum for integrity
sha256sum "kafka-audit-$AUDIT_DATE.tar.gz" > "kafka-audit-$AUDIT_DATE.tar.gz.sha256"

echo "Audit package created: kafka-audit-$AUDIT_DATE.tar.gz"

Success Criteria for Audit

Scan completes successfully (captures full cluster state) ✅ All 40 enterprise controls evaluatedPDF report generated (audit-ready format) ✅ CSV report generated (for spreadsheet analysis) ✅ JSON report generated (for programmatic analysis) ✅ Compliance matrices created (PCI-DSS, SOC2, ISO 27001) ✅ Failed controls documented with remediation plans ✅ Audit package created with integrity checksum

Audit Report Submission

#!/bin/bash
# submit-audit.sh - Package and submit audit report

AUDIT_DATE=$(date +%Y-Q%q)
AUDIT_DIR="/var/audits/kafka/$AUDIT_DATE"
SUBMISSION_DIR="/mnt/audit-submissions"

# Create submission package
mkdir -p "$SUBMISSION_DIR"

# Copy reports
cp "$AUDIT_DIR"/scan-*.pdf "$SUBMISSION_DIR/kafka-compliance-report-$AUDIT_DATE.pdf"
cp "$AUDIT_DIR"/pci-dss-compliance-matrix.csv "$SUBMISSION_DIR/"
cp "$AUDIT_DIR"/soc2-compliance-matrix.csv "$SUBMISSION_DIR/"
cp "$AUDIT_DIR"/iso27001-compliance-matrix.csv "$SUBMISSION_DIR/"
cp "$AUDIT_DIR"/remediation-tracker.txt "$SUBMISSION_DIR/"
cp "$AUDIT_DIR"/audit-metadata.txt "$SUBMISSION_DIR/"

# Create README for auditors
cat > "$SUBMISSION_DIR/README.txt" <<EOF
Kafka Compliance Audit Report - $AUDIT_DATE
============================================

This package contains the quarterly Kafka compliance audit report.

Contents:
1. kafka-compliance-report-$AUDIT_DATE.pdf - Full audit report
2. pci-dss-compliance-matrix.csv - PCI-DSS control mapping
3. soc2-compliance-matrix.csv - SOC2 Trust Service Criteria mapping
4. iso27001-compliance-matrix.csv - ISO 27001 control mapping
5. remediation-tracker.txt - Failed controls and remediation plans
6. audit-metadata.txt - Audit execution details

Tool: KafkaGuard (https://github.com/aiopsone/kafkaguard)
Policy: Enterprise Default (40 controls)
Scan Date: $(date)
Cluster: kafka-prod-01

For questions, contact: security@example.com
EOF

# Create submission archive
cd "$SUBMISSION_DIR/.."
tar -czf "kafka-audit-submission-$AUDIT_DATE.tar.gz" "$(basename $SUBMISSION_DIR)/"

echo "Audit submission package ready: kafka-audit-submission-$AUDIT_DATE.tar.gz"

CI/CD Integration Workflow

Scenario

Automated compliance scanning in CI/CD pipelines (GitHub Actions, GitLab CI, Jenkins) to block deployments on policy violations.

Use Case Details

  • Environment: CI/CD pipeline
  • Security: Environment variables for credentials
  • Policy: enterprise-default.yaml (production standards)
  • Frequency: Every commit, PR, or deployment
  • Report Format: JSON (for parsing) + HTML (for review)
  • Fail Threshold: High (block deployment on HIGH findings)

GitHub Actions Example

# .github/workflows/kafka-compliance.yml
name: Kafka Compliance Scan

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # Daily at 2 AM

jobs:
  compliance-scan:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Download KafkaGuard
        run: |
          curl -LO https://github.com/aiopsone/kafkaguard-releases/releases/latest/download/kafkaguard_Linux_x86_64.tar.gz && tar -xzf kafkaguard_Linux_x86_64.tar.gz
          chmod +x kafkaguard-linux-amd64
          sudo mv kafkaguard-linux-amd64 /usr/local/bin/kafkaguard

      - name: Verify KafkaGuard installation
        run: kafkaguard version

      - name: Run Kafka compliance scan
        env:
          KAFKAGUARD_SASL_USERNAME: ${{ secrets.KAFKA_USERNAME }}
          KAFKAGUARD_SASL_PASSWORD: ${{ secrets.KAFKA_PASSWORD }}
        run: |
          kafkaguard scan \
            --bootstrap ${{ secrets.KAFKA_BROKER }} \
            --security-protocol SASL_SSL \
            --sasl-mechanism SCRAM-SHA-512 \
            --tls-ca-cert certs/ca.pem \
            --policy policies/enterprise-default.yaml \
            --format json,html \
            --out ./reports \
            --fail-on high \
            --no-color
        continue-on-error: true
        id: scan

      - name: Parse scan results
        if: always()
        run: |
          LATEST_JSON=$(ls -t reports/scan-*.json | head -1)
          SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
          FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

          echo "SCAN_SCORE=$SCORE" >> $GITHUB_ENV
          echo "FAILED_HIGH=$FAILED_HIGH" >> $GITHUB_ENV

          echo "### Kafka Compliance Scan Results" >> $GITHUB_STEP_SUMMARY
          echo "- **Overall Score:** $SCORE%" >> $GITHUB_STEP_SUMMARY
          echo "- **HIGH Severity Failures:** $FAILED_HIGH" >> $GITHUB_STEP_SUMMARY

      - name: Upload scan reports
        if: always()
        uses: actions/upload-artifact@v3
        with:
          name: kafka-compliance-reports
          path: reports/
          retention-days: 90

      - name: Comment on PR (if PR)
        if: github.event_name == 'pull_request' && always()
        uses: actions/github-script@v6
        with:
          script: |
            const score = process.env.SCAN_SCORE;
            const failedHigh = process.env.FAILED_HIGH;
            const status = failedHigh > 0 ? '❌ FAILED' : '✅ PASSED';

            const comment = `## Kafka Compliance Scan ${status}

            - **Overall Score:** ${score}%
            - **HIGH Severity Failures:** ${failedHigh}

            [View detailed report](https://github.com/${{github.repository}}/actions/runs/${{github.run_id}})
            `;

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.name,
              body: comment
            });

      - name: Fail if HIGH severity findings
        if: env.FAILED_HIGH > 0
        run: |
          echo "❌ Deployment blocked: HIGH severity compliance findings detected"
          exit 1

GitLab CI Example

# .gitlab-ci.yml
stages:
  - compliance

kafka-compliance-scan:
  stage: compliance
  image: ubuntu:22.04

  before_script:
    - apt-get update && apt-get install -y wget jq
    - curl -LO https://github.com/aiopsone/kafkaguard-releases/releases/latest/download/kafkaguard_Linux_x86_64.tar.gz && tar -xzf kafkaguard_Linux_x86_64.tar.gz
    - chmod +x kafkaguard-linux-amd64
    - mv kafkaguard-linux-amd64 /usr/local/bin/kafkaguard

  script:
    - mkdir -p reports
    - |
      kafkaguard scan \
        --bootstrap $KAFKA_BROKER \
        --security-protocol SASL_SSL \
        --sasl-mechanism SCRAM-SHA-512 \
        --sasl-username $KAFKA_USERNAME \
        --sasl-password $KAFKA_PASSWORD \
        --tls-ca-cert certs/ca.pem \
        --policy policies/enterprise-default.yaml \
        --format json,html \
        --out ./reports \
        --fail-on high \
        --no-color || EXIT_CODE=$?

    - |
      LATEST_JSON=$(ls -t reports/scan-*.json | head -1)
      SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
      FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

      echo "Scan Score: $SCORE%"
      echo "HIGH Severity Failures: $FAILED_HIGH"

      if [ "$FAILED_HIGH" -gt 0 ]; then
        echo "❌ HIGH severity findings detected - blocking deployment"
        exit 1
      fi

  artifacts:
    when: always
    paths:
      - reports/
    expire_in: 90 days

  only:
    - main
    - develop
    - merge_requests

Jenkins Pipeline Example

// Jenkinsfile
pipeline {
  agent any

  environment {
    KAFKAGUARD_SASL_USERNAME = credentials('kafka-username')
    KAFKAGUARD_SASL_PASSWORD = credentials('kafka-password')
    KAFKA_BROKER = 'kafka.prod.example.com:9095'
  }

  stages {
    stage('Install KafkaGuard') {
      steps {
        sh '''
          curl -LO https://github.com/aiopsone/kafkaguard-releases/releases/latest/download/kafkaguard_Linux_x86_64.tar.gz && tar -xzf kafkaguard_Linux_x86_64.tar.gz
          chmod +x kafkaguard-linux-amd64
          mv kafkaguard-linux-amd64 /usr/local/bin/kafkaguard
          kafkaguard version
        '''
      }
    }

    stage('Run Compliance Scan') {
      steps {
        script {
          def exitCode = sh(
            script: '''
              mkdir -p reports
              kafkaguard scan \
                --bootstrap $KAFKA_BROKER \
                --security-protocol SASL_SSL \
                --sasl-mechanism SCRAM-SHA-512 \
                --tls-ca-cert certs/ca.pem \
                --policy policies/enterprise-default.yaml \
                --format json,html \
                --out ./reports \
                --fail-on high \
                --no-color
            ''',
            returnStatus: true
          )

          env.SCAN_EXIT_CODE = exitCode
        }
      }
    }

    stage('Parse Results') {
      steps {
        sh '''
          LATEST_JSON=$(ls -t reports/scan-*.json | head -1)
          SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
          FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

          echo "Scan Score: $SCORE%"
          echo "HIGH Severity Failures: $FAILED_HIGH"

          echo "$SCORE" > reports/score.txt
          echo "$FAILED_HIGH" > reports/failed-high.txt
        '''

        script {
          env.SCAN_SCORE = readFile('reports/score.txt').trim()
          env.FAILED_HIGH = readFile('reports/failed-high.txt').trim()
        }
      }
    }

    stage('Evaluate Results') {
      steps {
        script {
          if (env.SCAN_EXIT_CODE == '0') {
            currentBuild.result = 'SUCCESS'
            echo '✅ Compliance scan passed'
          } else if (env.SCAN_EXIT_CODE == '1') {
            if (env.FAILED_HIGH.toInteger() > 0) {
              currentBuild.result = 'FAILURE'
              error('❌ HIGH severity findings detected - blocking deployment')
            } else {
              currentBuild.result = 'UNSTABLE'
              echo '⚠️  Compliance findings detected (not HIGH severity)'
            }
          } else {
            currentBuild.result = 'FAILURE'
            error('❌ Scan failed with errors')
          }
        }
      }
    }
  }

  post {
    always {
      archiveArtifacts artifacts: 'reports/*', fingerprint: true
      publishHTML([
        allowMissing: false,
        alwaysLinkToLastBuild: true,
        keepAll: true,
        reportDir: 'reports',
        reportFiles: 'scan-*.html',
        reportName: 'Kafka Compliance Report'
      ])
    }

    success {
      echo "Compliance scan passed - deployment approved"
    }

    failure {
      emailext(
        subject: "Kafka Compliance Scan Failed - ${env.JOB_NAME} #${env.BUILD_NUMBER}",
        body: """
          Kafka compliance scan failed with HIGH severity findings.

          Score: ${env.SCAN_SCORE}%
          HIGH Severity Failures: ${env.FAILED_HIGH}

          View report: ${env.BUILD_URL}Kafka_Compliance_Report/
        """,
        to: 'security@example.com'
      )
    }
  }
}

Success Criteria for CI/CD

Scan completes in <300 seconds (for CI/CD efficiency) ✅ Exit code 0 = No HIGH findings, continue pipeline ✅ Exit code 1 = HIGH findings, block deployment ✅ Reports archived for audit trail ✅ Failed builds notify team (email, Slack, etc.)


Air-Gapped Scanning Workflow

Scenario

Scanning Kafka clusters in air-gapped networks (no internet access) for high-security or classified environments.

Use Case Details

  • Environment: Air-gapped network (no internet)
  • Security: SSL or SASL_SSL (internal CA)
  • Policy: Pre-bundled policies
  • Frequency: Scheduled or on-demand
  • Report Format: All formats (JSON, HTML, PDF, CSV)
  • Challenges: No external dependencies, no updates

Step-by-Step Air-Gapped Workflow

Step 1: Create Bundle (Internet-Connected System)

#!/bin/bash
# create-airgapped-bundle.sh - Run on internet-connected system

BUNDLE_DIR="kafkaguard-bundle"
VERSION="1.0.0"

# Create bundle directory
mkdir -p "$BUNDLE_DIR"

# Download KafkaGuard binary
curl -LO https://github.com/aiopsone/kafkaguard-releases/releases/latest/download/kafkaguard_Linux_x86_64.tar.gz && tar -xzf kafkaguard_Linux_x86_64.tar.gz \
  -O "$BUNDLE_DIR/kafkaguard"
chmod +x "$BUNDLE_DIR/kafkaguard"

# Clone repository for policies and docs
git clone https://github.com/aiopsone/kafkaguard.git /tmp/kafkaguard-repo
cp -r /tmp/kafkaguard-repo/policies "$BUNDLE_DIR/"
cp -r /tmp/kafkaguard-repo/docs "$BUNDLE_DIR/"
cp /tmp/kafkaguard-repo/README.md "$BUNDLE_DIR/"

# Create bundle README
cat > "$BUNDLE_DIR/README-AIRGAPPED.txt" <<EOF
KafkaGuard Air-Gapped Bundle
============================

Version: $VERSION
Created: $(date)
Platform: linux-amd64

Installation:
1. Extract bundle
2. Run: sudo mv kafkaguard /usr/local/bin/
3. Verify: kafkaguard version

Usage:
kafkaguard scan \\
  --bootstrap kafka-internal:9092 \\
  --policy policies/enterprise-default.yaml \\
  --out /var/reports

For full documentation, see docs/user-guide/index.md
EOF

# Create tarball
tar -czf "kafkaguard-airgapped-$VERSION.tar.gz" "$BUNDLE_DIR"

# Generate checksum
sha256sum "kafkaguard-airgapped-$VERSION.tar.gz" > "kafkaguard-airgapped-$VERSION.tar.gz.sha256"

echo "Bundle created: kafkaguard-airgapped-$VERSION.tar.gz"

Step 2: Transfer to Air-Gapped Network

# Transfer via approved method:
# - USB drive (most common)
# - Secure file transfer (if available)
# - CD/DVD (highly restricted environments)

# On air-gapped system, verify checksum
sha256sum -c kafkaguard-airgapped-1.0.0.tar.gz.sha256

Step 3: Install on Air-Gapped System

# Extract bundle
tar -xzf kafkaguard-airgapped-1.0.0.tar.gz
cd kafkaguard-bundle

# Install binary
sudo mv kafkaguard /usr/local/bin/
sudo chmod +x /usr/local/bin/kafkaguard

# Install policies
sudo mkdir -p /opt/kafkaguard
sudo cp -r policies /opt/kafkaguard/
sudo cp -r docs /opt/kafkaguard/

# Verify installation
kafkaguard version
kafkaguard list-controls --policy /opt/kafkaguard/policies/enterprise-default.yaml

Step 4: Run Scan in Air-Gapped Environment

# Create reports directory
mkdir -p /var/reports/kafkaguard

# Run scan
kafkaguard scan \
  --bootstrap kafka-internal1:9092,kafka-internal2:9092 \
  --security-protocol SSL \
  --tls-ca-cert /etc/certs/internal-ca.pem \
  --policy /opt/kafkaguard/policies/enterprise-default.yaml \
  --format json,html,pdf,csv \
  --out /var/reports/kafkaguard \
  --timeout 900

# View results
ls -lh /var/reports/kafkaguard/

Step 5: Export Reports from Air-Gapped Network

# Create reports archive
REPORT_DATE=$(date +%Y%m%d)
tar -czf "kafka-reports-$REPORT_DATE.tar.gz" /var/reports/kafkaguard/

# Transfer out via approved method
# Copy to USB drive, secure transfer, or approved export process

Air-Gapped Automation Script

#!/bin/bash
# airgapped-scan.sh - Automated air-gapped scanning

set -e

echo "🔒 KafkaGuard Air-Gapped Scan"
echo "============================="

# Configuration
KAFKA_BROKERS="kafka-internal1:9092,kafka-internal2:9092"
POLICY="/opt/kafkaguard/policies/enterprise-default.yaml"
REPORTS_DIR="/var/reports/kafkaguard"
CA_CERT="/etc/certs/internal-ca.pem"

# Create reports directory
mkdir -p "$REPORTS_DIR"

# Run scan
echo "Scanning internal Kafka cluster..."
kafkaguard scan \
  --bootstrap "$KAFKA_BROKERS" \
  --security-protocol SSL \
  --tls-ca-cert "$CA_CERT" \
  --policy "$POLICY" \
  --format json,html,pdf,csv \
  --out "$REPORTS_DIR" \
  --timeout 900 \
  --log-level info

EXIT_CODE=$?

# Process results
if [ $EXIT_CODE -eq 0 ]; then
  echo "✅ Scan completed successfully"
elif [ $EXIT_CODE -eq 1 ]; then
  echo "⚠️  Scan completed with findings"
else
  echo "❌ Scan failed with errors"
  exit 2
fi

# Create export package
EXPORT_DATE=$(date +%Y%m%d-%H%M%S)
EXPORT_DIR="/mnt/export/kafka-reports-$EXPORT_DATE"
mkdir -p "$EXPORT_DIR"

cp "$REPORTS_DIR"/scan-*.{json,html,pdf,csv} "$EXPORT_DIR/"

echo ""
echo "Reports ready for export: $EXPORT_DIR"
echo "Copy to approved media for transfer out of air-gapped network"

exit $EXIT_CODE

Success Criteria for Air-Gapped

Bundle created on internet-connected systemBundle transferred to air-gapped network securelyInstallation successful without network accessScan completes using bundled policiesAll report formats generatedReports exported securely


Multi-Cluster Scanning

Scenario

Scanning multiple Kafka clusters (dev, staging, prod) with a single script for centralized reporting.

Multi-Cluster Bash Script

#!/bin/bash
# multi-cluster-scan.sh - Scan multiple Kafka clusters

set -e

echo "🌐 KafkaGuard Multi-Cluster Scan"
echo "================================"

# Cluster configuration
declare -A CLUSTERS=(
  [dev]="kafka-dev:9092|PLAINTEXT|policies/baseline-dev.yaml"
  [staging]="kafka-staging:9095|SASL_SSL|policies/enterprise-default.yaml"
  [prod]="kafka-prod:9095|SASL_SSL|policies/enterprise-default.yaml"
)

REPORTS_BASE="/var/reports/kafkaguard"
TIMESTAMP=$(date +%Y%m%d-%H%M%S)

# Scan each cluster
for CLUSTER_NAME in "${!CLUSTERS[@]}"; do
  echo ""
  echo "Scanning $CLUSTER_NAME cluster..."

  IFS='|' read -r BOOTSTRAP PROTOCOL POLICY <<< "${CLUSTERS[$CLUSTER_NAME]}"

  CLUSTER_REPORTS="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
  mkdir -p "$CLUSTER_REPORTS"

  # Set credentials based on cluster
  if [ "$PROTOCOL" == "SASL_SSL" ]; then
    export KAFKAGUARD_SASL_USERNAME="kafkaguard-$CLUSTER_NAME"
    export KAFKAGUARD_SASL_PASSWORD=$(vault kv get -field=password "secret/kafka/$CLUSTER_NAME")
    SECURITY_FLAGS="--security-protocol SASL_SSL --sasl-mechanism SCRAM-SHA-512 --tls-ca-cert /etc/kafkaguard/certs/ca.pem"
  else
    SECURITY_FLAGS="--security-protocol PLAINTEXT"
  fi

  # Run scan
  kafkaguard scan \
    --bootstrap "$BOOTSTRAP" \
    $SECURITY_FLAGS \
    --policy "$POLICY" \
    --format json,html \
    --out "$CLUSTER_REPORTS" \
    --log-level info || true

  # Parse results
  LATEST_JSON=$(ls -t "$CLUSTER_REPORTS"/scan-*.json | head -1)
  SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
  FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')

  echo "  Score: $SCORE%"
  echo "  Failed controls: $FAILED"
done

echo ""
echo "All scans complete. Reports: $REPORTS_BASE"

Multi-Cluster Python Script

#!/usr/bin/env python3
# multi-cluster-scan.py - Advanced multi-cluster scanning

import subprocess
import json
import os
from datetime import datetime
from pathlib import Path

CLUSTERS = {
    'dev': {
        'bootstrap': 'kafka-dev:9092',
        'protocol': 'PLAINTEXT',
        'policy': 'policies/baseline-dev.yaml',
        'fail_on': 'medium'
    },
    'staging': {
        'bootstrap': 'kafka-staging:9095',
        'protocol': 'SASL_SSL',
        'policy': 'policies/enterprise-default.yaml',
        'fail_on': 'high'
    },
    'prod': {
        'bootstrap': 'kafka-prod:9095',
        'protocol': 'SASL_SSL',
        'policy': 'policies/enterprise-default.yaml',
        'fail_on': 'high'
    }
}

def scan_cluster(cluster_name, config):
    """Scan a single cluster"""
    print(f"\n🔍 Scanning {cluster_name} cluster...")

    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    reports_dir = Path(f"/var/reports/kafkaguard/{cluster_name}/{timestamp}")
    reports_dir.mkdir(parents=True, exist_ok=True)

    cmd = [
        'kafkaguard', 'scan',
        '--bootstrap', config['bootstrap'],
        '--policy', config['policy'],
        '--format', 'json,html',
        '--out', str(reports_dir),
        '--fail-on', config['fail_on'],
        '--log-level', 'info'
    ]

    if config['protocol'] == 'SASL_SSL':
        cmd.extend([
            '--security-protocol', 'SASL_SSL',
            '--sasl-mechanism', 'SCRAM-SHA-512'
        ])

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)

        # Parse results
        json_file = list(reports_dir.glob('scan-*.json'))[0]
        with open(json_file) as f:
            report = json.load(f)

        summary = report['summary']
        print(f"  ✅ Score: {summary['score']}%")
        print(f"     Passed: {summary['passed']}, Failed: {summary['failed']}")

        return {
            'cluster': cluster_name,
            'score': summary['score'],
            'passed': summary['passed'],
            'failed': summary['failed'],
            'report_path': str(json_file)
        }

    except Exception as e:
        print(f"  ❌ Scan failed: {e}")
        return None

def generate_summary(results):
    """Generate aggregate summary"""
    print("\n" + "="*50)
    print("Multi-Cluster Scan Summary")
    print("="*50)

    for result in results:
        if result:
            print(f"\n{result['cluster'].upper()}:")
            print(f"  Score: {result['score']}%")
            print(f"  Controls: {result['passed']} passed, {result['failed']} failed")
            print(f"  Report: {result['report_path']}")

if __name__ == '__main__':
    results = []

    for cluster_name, config in CLUSTERS.items():
        result = scan_cluster(cluster_name, config)
        if result:
            results.append(result)

    generate_summary(results)

    # Exit with error if any cluster had failures
    if any(r['failed'] > 0 for r in results if r):
        exit(1)

Scheduled Scanning

Cron Job (Linux)

# /etc/cron.d/kafkaguard-daily
# Daily scan at 2 AM

0 2 * * * kafkaguard /usr/local/bin/kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --policy /opt/kafkaguard/policies/enterprise-default.yaml \
  --out /var/reports/kafkaguard/daily \
  --format json,html,pdf \
  2>&1 | logger -t kafkaguard

Systemd Timer (Linux)

# /etc/systemd/system/kafkaguard-scan.service
[Unit]
Description=KafkaGuard Compliance Scan
After=network.target

[Service]
Type=oneshot
User=kafkaguard
Environment="KAFKAGUARD_SASL_USERNAME=kafkaguard"
EnvironmentFile=/etc/kafkaguard/credentials.env
ExecStart=/usr/local/bin/kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --tls-ca-cert /etc/kafkaguard/certs/ca.pem \
  --policy /opt/kafkaguard/policies/enterprise-default.yaml \
  --out /var/reports/kafkaguard \
  --format json,html,pdf

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/kafkaguard-scan.timer
[Unit]
Description=KafkaGuard Daily Scan Timer
Requires=kafkaguard-scan.service

[Timer]
OnCalendar=daily
OnCalendar=02:00
Persistent=true

[Install]
WantedBy=timers.target
# Enable and start timer
sudo systemctl enable kafkaguard-scan.timer
sudo systemctl start kafkaguard-scan.timer

# Check timer status
sudo systemctl list-timers kafkaguard-scan.timer

Report Management

Report Naming Convention

All reports follow this pattern:

scan-<timestamp>-<scan_id>.<format>

Example:

scan-20251115140530-abc123def456.json
scan-20251115140530-abc123def456.html
scan-20251115140530-abc123def456.pdf
scan-20251115140530-abc123def456.csv

Report Archival Script

#!/bin/bash
# archive-reports.sh - Archive and clean old reports

REPORTS_DIR="/var/reports/kafkaguard"
ARCHIVE_DIR="/var/reports/kafkaguard/archive"
RETENTION_DAYS=90

# Create archive directory
mkdir -p "$ARCHIVE_DIR"

# Archive reports older than 30 days
find "$REPORTS_DIR" -name "scan-*.{json,html,pdf,csv}" -mtime +30 -exec mv {} "$ARCHIVE_DIR/" \;

# Compress archived reports older than 60 days
find "$ARCHIVE_DIR" -name "scan-*" -mtime +60 ! -name "*.gz" -exec gzip {} \;

# Delete compressed reports older than retention period
find "$ARCHIVE_DIR" -name "scan-*.gz" -mtime +$RETENTION_DAYS -delete

echo "Report archival complete"

Document Information

  • Last Updated: 2025-11-15
  • Applies to Version: KafkaGuard 1.0.0+
  • Feedback: Open an issue for improvements