Skip to content

Configure Kafka ingestion on metrics collection cluster

Imported from Confluence

Content may be outdated. Verify before following any procedures. View original | Last updated: March 2022

Ensure that the Druid Kafka indexing service extension is loaded on the metrics collection cluster. See extensions for information on loading Druid extension.

Supervisor spec OFW druid:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "druid-metrics",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [ "segment", "interval" ]
        },
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        }
      }
    },
    "metricsSpec": [
      {"type":"count", "name":"count"},
      {"type":"doubleSum", "name":"sum", "fieldName":"value"},
      {"type":"doubleMin", "name":"min", "fieldName":"value"},
      {"type":"doubleMax", "name":"max", "fieldName":"value"},
      {"type":"approxHistogram", "name":"histogram", "fieldName":"value", "resolution":50}
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "hour",
      "queryGranularity": "none",
      "rollup": true
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 150000,
    "maxRowsPerSegment": 5000000,
    "intermediatePersistPeriod": "PT2H",
    "resetOffsetAutomatically": true
  },
  "ioConfig": {
    "topic": "druid-metrics",
    "taskCount": 1,
    "replicas": 2,
    "useEarliestOffset": false,
    "taskDuration": "PT2H",
    "consumerProperties": {
      "bootstrap.servers": "druid-kafka-0.service.consul:32092,druid-kafka-1.service.consul:32192,druid-kafka-2.service.consul:32292"
    }
  }
}

Supervisor spec FairBid Druid Virginia:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "druid-metrics",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [
            "segment",
            "interval"
          ]
        },
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        }
      }
    },
    "metricsSpec": [
      {
        "type": "count",
        "name": "count"
      },
      {
        "type": "doubleSum",
        "name": "sum",
        "fieldName": "value"
      },
      {
        "type": "doubleMin",
        "name": "min",
        "fieldName": "value"
      },
      {
        "type": "doubleMax",
        "name": "max",
        "fieldName": "value"
      },
      {
        "type": "approxHistogram",
        "name": "histogram",
        "fieldName": "value",
        "resolution": 50
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "hour",
      "queryGranularity": "none",
      "rollup": true
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 150000,
    "maxRowsPerSegment": 5000000,
    "intermediatePersistPeriod": "PT1H",
    "resetOffsetAutomatically": true
  },
  "ioConfig": {
    "topic": "druid-metrics",
    "taskCount": 3,
    "replicas": 2,
    "useEarliestOffset": false,
    "taskDuration": "PT1H",
    "consumerProperties": {
      "bootstrap.servers": "druid-metrics-kafka-0.druid-metrics-kafka-brokers.druid.svc.cluster.local:9092,druid-metrics-kafka-1.druid-metrics-kafka-brokers.druid.svc.cluster.local:9092,druid-metrics-kafka-2.druid-metrics-kafka-brokers.druid.svc.cluster.local:9092"
    }
  }
}

Adjust kafka bootstrap adress, metrics topic name and other settings accordingly.

Apply the spec by running the following command from the directory to which you downloaded the spec:

curl -XPOST -H'Content-Type: application/json' -d@clarity-kafka-supervisor.json http://<overlord_address>:8090/druid/indexer/v1/supervisor
if security is enabled then
curl -u admin:<PASSWORD> -XPOST -H'Content-Type: application/json' -d@clarity-kafka-supervisor.json http://<overlord_address>:8090/druid/indexer/v1/supervisor

Replace overlord_address with the IP address of the machine running the overlord process in your Imply cluster. This is typically the Master server in the Druid cluster.

References:
1. Tutorial Kafka

2. Kafka Ingestion