• /
  • EnglishEspañolFrançais日本語한국어Português
  • ログイン今すぐ開始

この機械翻訳は、参考として提供されています。

英語版と翻訳版に矛盾がある場合は、英語版が優先されます。詳細については、このページを参照してください。

問題を作成する

OpenTelemetryを使用したKubernetes上のセルフマネージドKafkaのモニター

OpenTelemetry Collectorをデプロイしてメトリクスを収集し、New Relicに転送することで、Kubernetes上で実行されている自己管理型のApache Kafkaクラスタをモニターします。

アーキテクチャー

New Relicは、セルフマネージドKubernetes Kafkaを監視するための2つのアプローチをサポートしています:OpenTelemetry JavaエージェントまたはPrometheus JMX Exporterです。以下の図は、各アプローチのデータフローを示しています。

Kubernetes self-managed Kafka monitoring architecture

インストレーション手順

以下の手順に従って、ブローカーにOpenTelemetry Javaエージェントをインストールし、メトリクスとログを収集してNew Relicに送信するためのコレクターをデプロイして、包括的なKafka監視をセットアップします。

あなたが始める前に

以下のものを用意してください:

  • New Relicアカウント
  • kubectlアクセスを持つKubernetesクラスタ
  • StatefulSetとしてデプロイされたKafka
  • Kafka StatefulSet を変更および再デプロイする機能

OpenTelemetry Collector をデプロイする

クラスタにOpenTelemetryコレクターをデプロイします。このステップでは、Javaエージェントが各ブローカーポッドからどのJMXメトリクスを収集するかを定義するkafka-jmx-configConfigMapも作成します。次の手順でKafkaブローカーを再起動する前に、コレクターが実行中である必要があります。

ステップ1. New Relicの認証情報シークレットを作成する

ヒント

その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。

ステップ2. コレクター設定を含むvalues.yamlを作成する

NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。希望するコレクターイメージを選択してください:

高度な設定オプションについては、次を参照してください:

  • OTLPレシーバードキュメント

  • Kafka メトリクスレシーバーのドキュメント

    ステップ3. Helmを使用してOpenTelemetry Collectorをインストールする

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    ステップ4。デプロイメントを確認する

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics are being received from broker pods
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

ステップ1. New Relicの認証情報シークレットを作成する

ヒント

その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。

ステップ2. マニフェストファイルを作成する

NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。コンテナイメージのみが異なります。どちらもまた、Kafkaネームスペースにkafka-jmx-configConfigMapを適用する必要があります。

kafka-jmx-config.yamlを作成 - JavaエージェントのJMXメトリクス設定(Kafkaネームスペースに適用):

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-config
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-jmx-config.yaml: |
---
rules:
# Per-topic custom metrics
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: Number of active controllers in the cluster
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute)
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC

ステップ3。マニフェストをデプロイする

bash
$
# Create namespace if it doesn't exist
$
kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
$
$
# Apply JMX ConfigMap to the Kafka namespace
$
kubectl apply -f kafka-jmx-config.yaml
$
$
# Apply collector ConfigMap
$
kubectl apply -f collector-configmap.yaml
$
$
# Apply Deployment and Service
$
kubectl apply -f collector-deployment.yaml

ステップ4。デプロイメントを確認する

bash
$
# Check pod status
$
kubectl get pods -n newrelic -l app=otel-collector
$
$
# View logs to verify metrics are being received from broker pods
$
kubectl logs -n newrelic -l app=otel-collector --tail=50

Javaエージェント用にKafka StatefulSetを設定する

コレクターが実行中になったので、Kafka StatefulSetにパッチを適用してOpenTelemetry JavaエージェントのJARをダウンロードするinitコンテナを追加し、KAFKA_OPTS経由でKafkaブローカーのJVMにアタッチします。

既存のKafka StatefulSetマニフェストに以下のセクションを追加します:

spec:
template:
spec:
# 1. Init container: downloads OTel Java agent JAR before Kafka starts
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-agent/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach OTel Java agent to the Kafka broker JVM
env:
- name: KAFKA_OPTS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.jmx.enabled=true
-Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.runtime-telemetry.enabled=false
-Dotel.metric.export.interval=30000
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
- name: jmx-config
mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules
volumes:
- name: otel-agent
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-config # Deployed with the collector in the previous step

ヒント

前のステップで、kafka-jmx-configConfigMapはコレクターとともにデプロイされました。otel.exporter.otlp.endpointの値http://otel-collector.newrelic.svc.cluster.local:4317は、コレクターがサービス名otel-collectornewrelicネームスペースにデプロイされていることを前提としています。異なる場合は、実際のコレクターサービスのDNSと一致するように更新してください。

更新したStatefulSetを適用し、ポッドがロールするのを待ちます:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

(オプション) 計装プロデューサーまたは消費者アプリケーション

重要

言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。

Kubernetesで実行されているKafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、それらのアプリケーションポッドにOpenTelemetry Javaエージェントを追加します。

アプリケーションのデプロイメントにinitコンテナと環境変数を追加します:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.service.name=order-process-service
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
volumes:
- name: otel-agent
emptyDir: {}

設定パラメーター

次の表では、キー設定について説明します。

パラメータ

説明

order-process-service

プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。

my-kafka-cluster

ブローカーの設定で使用されているのと同じクラスタ名に置き換えます。

otel-collector.newrelic.svc.cluster.local

コレクターサービスの実際のDNS名(

<service-name>.<namespace>.svc.cluster.local

)に置き換えます

Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。

ブローカーポッドにPrometheus JMX Exporterをインストールし、メトリクスを収集してNew Relicに送信するコレクターをデプロイすることで、包括的なKafka監視をセットアップするには、以下の手順に従ってください。

あなたが始める前に

以下のものを用意してください:

  • New Relicアカウント
  • kubectlアクセスを持つKubernetesクラスタ
  • ヘッドレスサービス(安定したポッドDNS名用)を使用してStatefulSetとしてデプロイされたKafka
  • Kafka StatefulSet を変更および再デプロイする機能

JMX メトリクス ConfigMap の作成

収集するKafkaメトリクスを定義するJMX Exporterの設定を含むConfigMapを作成します。このConfigMapは、各Kafkaブローカーポッドにマウントされます。

kafka-jmx-config.yamlとして保存します。Kafkaがデプロイされているネームスペースに適用します:

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-metrics
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-metrics-config.yml: |
startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

ヒント

メトリクスのカスタマイズ: Prometheus JMX Exporterの例およびKafka MBeanドキュメントを参照して、パターンを追加または変更できます。

ConfigMap を適用します。

bash
$
kubectl apply -f kafka-jmx-config.yaml

JMX Exporter 用に Kafka StatefulSet を設定する

Kafka StatefulSetにパッチを適用して、Prometheus JMX Exporter JARをダウンロードするinitコンテナを追加し、KAFKA_OPTS経由でKafkaブローカーのJVMにアタッチします。

ステップ1. 既存のKafka StatefulSetマニフェストに以下のセクションを追加します:

spec:
template:
spec:
# 1. Init container: downloads JMX Exporter JAR before Kafka starts
initContainers:
- name: download-jmx-exporter
image: busybox:latest
command:
- sh
- -c
- |
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
JMX_EXPORTER_VERSION="1.5.0"
wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach JMX Exporter as Java agent on port 9404
env:
- name: KAFKA_OPTS
value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml"
# 3. Expose port 9404 for Prometheus scraping
ports:
- name: jmx-metrics
containerPort: 9404
protocol: TCP
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
- name: jmx-config
mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config
volumes:
- name: prometheus-jmx
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2

ステップ2. 更新したStatefulSetを適用し、ポッドがロールするのを待ちます:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

ステップ 3。ロールアウトが完了したら、各ブローカーポッドでメトリクスが公開されていることを確認します:

bash
$
# Replace kafka-0 and kafka with your pod name and namespace
$
kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20

重要

マルチブローカークラスタ: initコンテナとKAFKA_OPTSの設定は、StatefulSet内のすべてのポッドに自動的に適用されます。ロールアウト後、各ブローカーポッドがメトリクスを公開していることを確認します。

OpenTelemetry Collector をデプロイする

クラスタにOpenTelemetry Collectorをデプロイします。コレクターは、静的DNSターゲットを使用してKafkaブローカーのポッドをスクレイプし、インストゥルメントされたアプリケーションからのOTLPデータをポート4317でリッスンします。

Helmメソッドは、 KubernetesのデプロイOpenTelemetry Collectorに推奨されるアプローチです。

ステップ1. New Relicの認証情報シークレットを作成する

ヒント

その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。

ステップ2. コレクター設定を含むvalues.yamlを作成する

NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。希望するコレクターイメージを選択してください:

高度な設定オプションについては、次の受信機のドキュメント ページを参照してください。

  • Prometheusレシーバーのドキュメント

  • Kafka メトリクスレシーバーのドキュメント

    ステップ3. Helmを使用してOpenTelemetry Collectorをインストールする

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    ステップ 4. デプロイメントを検証します:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

    ポート9404のKafkaブローカーポッドからのスクレイピングが成功したことを示すログが表示されるはずです。

マニフェスト インストレーション メソッドは、 Helmを使用せずにKubernetesリソースを直接制御します。

ステップ1. New Relicの認証情報シークレットを作成する

ヒント

その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。

ステップ2. マニフェストファイルを作成する

NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。コンテナイメージのみが異なります。

高度な設定オプションについては、次の受信機のドキュメント ページを参照してください。

  • Prometheusレシーバーのドキュメント

  • Kafka メトリクスレシーバーのドキュメント

    ステップ3。マニフェストをデプロイする

    bash
    $
    # Create namespace if it doesn't exist
    $
    kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
    $
    $
    # Apply ConfigMap
    $
    kubectl apply -f collector-configmap.yaml
    $
    $
    # Apply Deployment (includes ServiceAccount)
    $
    kubectl apply -f collector-deployment.yaml

    ステップ 4. デプロイメントを検証します:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app=otel-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app=otel-collector --tail=50

    ポート9404のKafkaブローカーポッドからのスクレイピングが成功したことを示すログが表示されるはずです。

(オプション) 計装プロデューサーまたは消費者アプリケーション

重要

言語サポート: Javaアプリケーションは、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装を標準でサポートしています。

Kafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、initコンテナでOpenTelemetry Javaエージェントを使用します:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-java-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar
-Dotel.service.name=my-kafka-app
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
volumes:
- name: otel-auto-instrumentation
emptyDir: {}

設定パラメーター

次の表では、キー設定について説明します。

パラメータ説明
service.namemy-kafka-appプロデューサーまたは消費者アプリケーションの一意の名前に置き換えます。
kafka.cluster.namemy-kafka-clusterコレクター設定で使用されているのと同じクラスタ名に置き換えます。
otlp.endpointエンドポイントhttp://otel-collector.newrelic.svc.cluster.local:4317は、コレクターがnewrelicネームスペースに次のようにデプロイされていると想定しています otel-collector

Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。

(オプション)Kafkaブローカーログを転送する

Kafkaブローカーのログを収集してNew Relicに送信するには、コレクターの設定にfilelogレシーバーを追加します。

データを検索する

数分後、New Relic に Kafka データが表示されるはずです。New Relic UIのさまざまなビューでKafkaデータを探索するための詳細な手順については、データの検索を参照してください。

以下の表は、各シグナルタイプの保存先をまとめています。以下のすべてのクエリで、my-kafka-clusterKAFKA_CLUSTER_NAMEの値に置き換えます:

シグナルイベントタイプ含まれるもの
指標Metricブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクス
ログLogプロデューサーおよび消費者アプリケーションからのログ(OTel Javaエージェント経由)と、Javaエージェント経由で収集されたブローカーログ
トレースSpanトピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパン

指標

ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクスは、Metricイベントタイプに格納されます:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

ログ

OpenTelemetry Javaエージェントでインストゥルメントされたプロデューサーおよび消費者アプリケーションからのログ、およびブローカー上のJavaエージェントを介して収集されたブローカーログは、Logイベントタイプに保存されます:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

トレース

トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパンは、Spanイベントタイプに保存されます:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Kafka StatefulSetマニフェスト、Helm値、OTel Collectorの設定、およびサンプルのプロデューサー/消費者アプリケーションを含む完全な動作例は、New Relic OpenTelemetry Examplesリポジトリで利用可能です。

トラブルシューティング

次のステップ

Copyright © 2026 New Relic株式会社。

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.