OpenTelemetry Collectorデプロイして、Strimzi オペレーターを使用してKubernetes上で実行されている Kafka クラスターを監視します。 コレクターは、Kafka ブローカー Pod を自動的に検出し、包括的なメトリクスを収集します。
あなたが始める前に
以下のものを用意してください:
- New Relicアカウント
- kubectl アクセスを使用したKubernetesクラスター
- JMX を有効にしたStrimzi オペレーター経由でデプロイされた Kafka
Strimzi Kafka で JMX を有効にする
Kafka クラスターの Strimzi Kafka リソースで JMX が有効になっていることを確認します。
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka: jmxOptions: {} # Enables JMX with default settings # ...other broker configurationステップ 1: ネームスペースを作成する
OpenTelemetry Collector 専用のネームスペースを作成します (または既存の Kafka ネームスペースを使用します)。
$kubectl create namespace kafkaステップ2: ライセンスキーを使用してシークレットを作成する
New Relic ライセンスキーを Kubernetes シークレットとして保存します。
$kubectl create secret generic nr-license-key \> --from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \> -n kafkaYOUR_LICENSE_KEY実際のNew Relicライセンスキーに置き換えます。
ステップ3: OpenTelemetry Collectorをデプロイする
3.1 カスタムコレクターイメージの構築
Java ランタイムと JMX スクレーパーを使用してカスタム OpenTelemetry Collector イメージを作成します。
重要
バージョンの互換性: このガイドでは、JMX Scraper 1.52.0 と OpenTelemetry Collector 0.143.1 を使用します。古いコレクターのバージョンでは、このスクレーパーのハッシュが互換性リストに含まれていない可能性があります。最良の結果を得るには、このガイドに示されている最新バージョンを使用してください。
目標アーキテクチャー: OpenTelemetry Collectorリリースページを参照して、システム アーキテクチャー (例: linux_amd64 、 linux_arm64 、 darwin_amd64) に適したバイナリを見つけてください。 それに応じて Dockerfile 内のTARGETARCH変数を更新します。
Dockerfileとして保存:
# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector BinaryARG OTEL_VERSION=0.143.1ARG TARGETARCH=linux_amd64ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcolRUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)ARG JMX_SCRAPER_JAR_VERSION=1.52.0ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)ARG USER_UID=65532RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtimeFROM openjdk:17-jre-slim
COPY /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jarCOPY /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888ENTRYPOINT ["/otelcol-contrib"]CMD ["--config", "/conf/otel-agent-config.yaml"]イメージをビルドしてプッシュします。
$docker build -t your-registry/otel-collector-kafka:latest .$docker push your-registry/otel-collector-kafka:latest3.2 JMX カスタムメトリック ConfigMap の作成
まず、カスタム JMX メトリクス設定を使用して ConfigMap を作成します。 jmx-kafka-config.yamlとして保存:
apiVersion: v1kind: ConfigMapmetadata: name: jmx-kafka-config namespace: kafkadata: jmx-kafka-config.yaml: | --- rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)ヒント
メトリクス コレクションをカスタマイズする: カスタム MBean ルールをkafka-jmx-config.yamlファイルに追加することで、追加の Kafka メトリクスをスクレイピングできます。
JMX メトリクス ルールの基本構文を学習します
利用可能なMBean名はKafka監視ドキュメントで確認してください。
これにより、特定の監視ニーズに基づいて、Kafka ブローカーによって公開された JMX メトリクスを収集できるようになります。
JMX ConfigMap を適用します。
$kubectl apply -f jmx-kafka-config.yaml3.3 コレクター構成マップの作成
OpenTelemetry Collector設定でConfigMapを作成します。 otel-kafka-config.yamlとして保存:
---apiVersion: v1kind: ConfigMapmetadata: name: otel-collector-config namespace: kafka labels: app: otel-collectordata: otel-collector-config.yaml: | receivers: # Kafka cluster-level metrics (runs once per OTEL collector) kafkametrics/cluster: brokers: - "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# Receiver creator for dynamic per-broker JMX receivers receiver_creator: watch_observers: [k8s_observer] receivers: # JMX receiver template (created per discovered broker pod) jmx: rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka" config: endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi' jar_path: /opt/opentelemetry-jmx-scraper.jar target_system: kafka jmx_configs: /conf-jmx/jmx-kafka-config.yaml collection_interval: 30s # Set dynamic resource attributes from discovered pod resource_attributes: broker.endpoint: '`endpoint`'
exporters: otlp: endpoint: https://otlp.nr-data.net:4317 tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true headers: api-key: ${NEW_RELIC_LICENSE_KEY}
processors: # Batch processor for efficiency batch/aggregation: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Detect system resources resourcedetection: detectors: [env, docker, system] timeout: 5s override: false
# Add Kafka cluster metadata resource/kafka_metadata: attributes: - key: kafka.cluster.name value: my-cluster action: upsert
# Extract Kubernetes attributes k8sattributes: auth_type: serviceAccount passthrough: false extract: metadata: - k8s.pod.name - k8s.pod.uid - k8s.namespace.name - k8s.node.name labels: - tag_name: strimzi.cluster key: strimzi.io/cluster from: pod - tag_name: strimzi.kind key: strimzi.io/kind from: pod
# Transform metrics for New Relic UI transform: metric_statements: - context: metric statements: # Clean up descriptions and units - set(description, "") where description != "" - set(unit, "") where unit != ""
- context: resource statements: # Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit) - set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id") - delete_key(attributes, "broker.endpoint") - delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
# Filter to include only cluster-level metrics filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic cumulativetodelta:
extensions: # K8s observer extension k8s_observer: auth_type: serviceAccount observe_pods: true observe_nodes: false
service: extensions: [k8s_observer]
pipelines: # Per-broker metrics pipeline (with broker.id) metrics/broker: receivers: - receiver_creator - kafkametrics/cluster processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/exclude_cluster_metrics - transform - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated) metrics/cluster: receivers: - receiver_creator processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/include_cluster_metrics - transform/remove_broker_id - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]設定メモ:
my-cluster-kafka-bootstrapStrimzi Kafka サービス名に置き換えますruleとkafka.cluster.nameのmy-clusterクラスタ名に置き換えます- ネームスペースが異なる場合はネームスペースを更新します
kafka - OTLP エンドポイント:
https://otlp.nr-data.net:4317(米国リージョン) またはhttps://otlp.eu01.nr-data.net:4317(EU リージョン) を使用します。他の地域のOTLP エンドポイントの構成を参照してください receiver_creatorは Strimzi ラベルを使用して Kafka ブローカー ポッドを自動的に検出します
ConfigMap を適用します。
$kubectl apply -f otel-kafka-config.yaml3.4 コレクターをデプロイする
デプロイメントを作成します。otel-collector-deployment.yamlとして保存:
apiVersion: apps/v1kind: Deploymentmetadata: name: otel-collector namespace: kafkaspec: replicas: 1 selector: matchLabels: app: otel-collector template: metadata: labels: app: otel-collector spec: serviceAccountName: otel-collector containers: - name: otel-collector image: your-registry/otel-collector-kafka:latest env: - name: NEW_RELIC_LICENSE_KEY valueFrom: secretKeyRef: name: nr-license-key key: NEW_RELIC_LICENSE_KEY resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "500m" memory: "1Gi" volumeMounts: - name: vol-kafka-test-cluster mountPath: /conf - name: jmx-config mountPath: /conf-jmx ports: - containerPort: 4317 # OTLP gRPC - containerPort: 4318 # OTLP HTTP - containerPort: 8888 # Metrics volumes: - name: vol-kafka-test-cluster configMap: name: otel-collector-config items: - key: otel-collector-config.yaml path: otel-agent-config.yaml - name: jmx-config configMap: name: jmx-kafka-config items: - key: jmx-kafka-config.yaml path: jmx-kafka-config.yaml---apiVersion: v1kind: ServiceAccountmetadata: name: otel-collector namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRolemetadata: name: otel-collectorrules: - apiGroups: [""] resources: ["pods", "nodes"] verbs: ["get", "list", "watch"]---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRoleBindingmetadata: name: otel-collectorsubjects: - kind: ServiceAccount name: otel-collector namespace: kafkaroleRef: kind: ClusterRole name: otel-collector apiGroup: rbac.authorization.k8s.ioリソース設定:
- 上記のリソース制限は、中規模の Kafka クラスター (5~10 ブローカー、20~100 トピック) に適しています。
デプロイメントを適用します。
$kubectl apply -f otel-collector-deployment.yamlコレクターが実行中であることを確認します。
$kubectl get pods -n kafka -l app=otel-collector$kubectl logs -n kafka -l app=otel-collector -fステップ 4: (オプション) 計装プロデューサーまたは消費者アプリケーション
Kubernetesで実行されている Kafka プロデューサーおよび消費者アプリケーションからアプリケーション レベルのテレメトリーを収集するには、 OpenTelemetry Javaエージェントを使用してそれらを計装します。
Kafka アプリケーションを計装する
Kafka プロデューサーまたは消費者アプリケーションを計測するには、 OpenTelemetry Javaエージェントを既存のデプロイメントに追加します。
Javaエージェントをダウンロードします。init コンテナを追加してエージェント JAR をダウンロードします。
initContainers:- name: download-otel-agentimage: busybox:latestcommand:- sh- -c- |wget -O /otel/opentelemetry-javaagent.jar \https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarvolumeMounts:- name: otel-agentmountPath: /otelJavaエージェントを構成します: 環境変数をアプリケーション コンテナーに追加します。
env:- name: JAVA_TOOL_OPTIONSvalue: >--javaagent:/otel/opentelemetry-javaagent.jar-Dotel.service.name="kafka-producer"-Dotel.resource.attributes="kafka.cluster.name=my-cluster"-Dotel.exporter.otlp.endpoint="http://localhost:4317"-Dotel.exporter.otlp.protocol="grpc"-Dotel.metrics.exporter="otlp"-Dotel.traces.exporter="otlp"-Dotel.logs.exporter="otlp"-Dotel.instrumentation.kafka.experimental-span-attributes="true"-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"-Dotel.instrumentation.kafka.producer-propagation.enabled="true"-Dotel.instrumentation.kafka.enabled="true"volumeMounts:- name: otel-agentmountPath: /otelボリュームを追加します: ボリューム定義を含めます:
volumes:- name: otel-agentemptyDir: {}
交換する:
kafka-producerアプリケーションに固有の名前を付けるmy-clusterあなたのKafkaクラスタ名で
ヒント
上記の設定は、localhost:4317 で実行されているOpenTelemetry Collectorにテレメトリーを送信します。 この設定を使用して独自のコレクターを展開します。
receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: "${NEW_RELIC_LICENSE_KEY}" compression: gzip timeout: 30s
service: pipelines: traces: receivers: [otlp] exporters: [otlp/newrelic] metrics: receivers: [otlp] exporters: [otlp/newrelic] logs: receivers: [otlp] exporters: [otlp/newrelic]これにより、処理をカスタマイズしたり、フィルターを追加したり、複数のバックエンドにルーティングしたりできるようになります。その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
Javaエージェントは、コード変更なしですぐに使用できる Kafka 計装を提供し、以下をキャプチャします。
- リクエストのレイテンシ
- スループット メトリクス
- エラー率
- 分散型トレース
高度な設定については、 Kafka 計装ドキュメントを参照してください。
ステップ5: (オプション) Kafkaブローカーログを転送する
Kubernetesから Kafka ブローカーのログを収集し、 New Relicに送信するには、 OpenTelemetry Collectorでファイル ログ レシーバーを設定します。
データを検索する
数分後、Kafka メトリクスがNew Relicに表示されるはずです。 New Relic UI のさまざまなビューで Kafka メトリクスを探索する詳細な手順については、 「データの検索」を参照してください。
NRQL を使用してデータをクエリすることもできます。
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'トラブルシューティング
次のステップ
- Kafka メトリクスを調べる- 完全なメトリクスリファレンスを見る
- カスタムダッシュボードの作成- Kafka データの視覚化を構築します
- アラートのセットアップ- 消費者のラグやレプリケーションが不十分なパーティションなどの重要なメトリクスを監視します