OpenTelemetry Collectorをデプロイしてメトリクスを収集し、New Relicに転送することで、Kubernetes上で実行されている自己管理型のApache Kafkaクラスタをモニターします。
アーキテクチャー
New Relicは、セルフマネージドKubernetes Kafkaを監視するための2つのアプローチをサポートしています:OpenTelemetry JavaエージェントまたはPrometheus JMX Exporterです。以下の図は、各アプローチのデータフローを示しています。

インストレーション手順
以下の手順に従って、ブローカーにOpenTelemetry Javaエージェントをインストールし、メトリクスとログを収集してNew Relicに送信するためのコレクターをデプロイして、包括的なKafka監視をセットアップします。
あなたが始める前に
以下のものを用意してください:
- New Relicアカウント
kubectlアクセスを持つKubernetesクラスタ- StatefulSetとしてデプロイされたKafka
- Kafka StatefulSet を変更および再デプロイする機能
OpenTelemetry Collector をデプロイする
クラスタにOpenTelemetryコレクターをデプロイします。このステップでは、Javaエージェントが各ブローカーポッドからどのJMXメトリクスを収集するかを定義するkafka-jmx-configConfigMapも作成します。次の手順でKafkaブローカーを再起動する前に、コレクターが実行中である必要があります。
ステップ1. New Relicの認証情報シークレットを作成する
ヒント
その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
ステップ2. コレクター設定を含むvalues.yamlを作成する
NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。希望するコレクターイメージを選択してください:
高度な設定オプションについては、次を参照してください:
ステップ3. Helmを使用してOpenTelemetry Collectorをインストールする
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlステップ4。デプロイメントを確認する
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50
ステップ1. New Relicの認証情報シークレットを作成する
ヒント
その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
ステップ2. マニフェストファイルを作成する
NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。コンテナイメージのみが異なります。どちらもまた、Kafkaネームスペースにkafka-jmx-configConfigMapを適用する必要があります。
kafka-jmx-config.yamlを作成 - JavaエージェントのJMXメトリクス設定(Kafkaネームスペースに適用):
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-config namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-jmx-config.yaml: | --- rules: # Per-topic custom metrics - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
- bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: Number of active controllers in the cluster unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GCステップ3。マニフェストをデプロイする
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply JMX ConfigMap to the Kafka namespace$kubectl apply -f kafka-jmx-config.yaml$
$# Apply collector ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment and Service$kubectl apply -f collector-deployment.yamlステップ4。デプロイメントを確認する
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app=otel-collector --tail=50Javaエージェント用にKafka StatefulSetを設定する
コレクターが実行中になったので、Kafka StatefulSetにパッチを適用してOpenTelemetry JavaエージェントのJARをダウンロードするinitコンテナを追加し、KAFKA_OPTS経由でKafkaブローカーのJVMにアタッチします。
既存のKafka StatefulSetマニフェストに以下のセクションを追加します:
spec: template: spec: # 1. Init container: downloads OTel Java agent JAR before Kafka starts initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - | wget -O /otel-agent/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach OTel Java agent to the Kafka broker JVM env: - name: KAFKA_OPTS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.jmx.enabled=true -Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.runtime-telemetry.enabled=false -Dotel.metric.export.interval=30000 volumeMounts: - name: otel-agent mountPath: /otel-agent - name: jmx-config mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules volumes: - name: otel-agent emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-config # Deployed with the collector in the previous stepヒント
前のステップで、kafka-jmx-configConfigMapはコレクターとともにデプロイされました。otel.exporter.otlp.endpointの値http://otel-collector.newrelic.svc.cluster.local:4317は、コレクターがサービス名otel-collectorでnewrelicネームスペースにデプロイされていることを前提としています。異なる場合は、実際のコレクターサービスのDNSと一致するように更新してください。
更新したStatefulSetを適用し、ポッドがロールするのを待ちます:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace(オプション) 計装プロデューサーまたは消費者アプリケーション
重要
言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。
Kubernetesで実行されているKafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、それらのアプリケーションポッドにOpenTelemetry Javaエージェントを追加します。
アプリケーションのデプロイメントにinitコンテナと環境変数を追加します:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.service.name=order-process-service -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-agent mountPath: /otel-agent
volumes: - name: otel-agent emptyDir: {}設定パラメーター
次の表では、キー設定について説明します。
パラメータ | 説明 |
|---|---|
| プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。 |
| ブローカーの設定で使用されているのと同じクラスタ名に置き換えます。 |
| コレクターサービスの実際のDNS名(
)に置き換えます |
Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。
ブローカーポッドにPrometheus JMX Exporterをインストールし、メトリクスを収集してNew Relicに送信するコレクターをデプロイすることで、包括的なKafka監視をセットアップするには、以下の手順に従ってください。
あなたが始める前に
以下のものを用意してください:
- New Relicアカウント
kubectlアクセスを持つKubernetesクラスタ- ヘッドレスサービス(安定したポッドDNS名用)を使用してStatefulSetとしてデプロイされたKafka
- Kafka StatefulSet を変更および再デプロイする機能
JMX メトリクス ConfigMap の作成
収集するKafkaメトリクスを定義するJMX Exporterの設定を含むConfigMapを作成します。このConfigMapは、各Kafkaブローカーポッドにマウントされます。
kafka-jmx-config.yamlとして保存します。Kafkaがデプロイされているネームスペースに適用します:
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-metrics namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-metrics-config.yml: | startDelaySeconds: 0 lowercaseOutputName: true lowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"ヒント
メトリクスのカスタマイズ: Prometheus JMX Exporterの例およびKafka MBeanドキュメントを参照して、パターンを追加または変更できます。
ConfigMap を適用します。
$kubectl apply -f kafka-jmx-config.yamlJMX Exporter 用に Kafka StatefulSet を設定する
Kafka StatefulSetにパッチを適用して、Prometheus JMX Exporter JARをダウンロードするinitコンテナを追加し、KAFKA_OPTS経由でKafkaブローカーのJVMにアタッチします。
ステップ1. 既存のKafka StatefulSetマニフェストに以下のセクションを追加します:
spec: template: spec: # 1. Init container: downloads JMX Exporter JAR before Kafka starts initContainers: - name: download-jmx-exporter image: busybox:latest command: - sh - -c - | # Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases. JMX_EXPORTER_VERSION="1.5.0" wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \ "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar" volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach JMX Exporter as Java agent on port 9404 env: - name: KAFKA_OPTS value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml" # 3. Expose port 9404 for Prometheus scraping ports: - name: jmx-metrics containerPort: 9404 protocol: TCP volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx - name: jmx-config mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config volumes: - name: prometheus-jmx emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2ステップ2. 更新したStatefulSetを適用し、ポッドがロールするのを待ちます:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespaceステップ 3。ロールアウトが完了したら、各ブローカーポッドでメトリクスが公開されていることを確認します:
$# Replace kafka-0 and kafka with your pod name and namespace$kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20重要
マルチブローカークラスタ: initコンテナとKAFKA_OPTSの設定は、StatefulSet内のすべてのポッドに自動的に適用されます。ロールアウト後、各ブローカーポッドがメトリクスを公開していることを確認します。
OpenTelemetry Collector をデプロイする
クラスタにOpenTelemetry Collectorをデプロイします。コレクターは、静的DNSターゲットを使用してKafkaブローカーのポッドをスクレイプし、インストゥルメントされたアプリケーションからのOTLPデータをポート4317でリッスンします。
Helmメソッドは、 KubernetesのデプロイOpenTelemetry Collectorに推奨されるアプローチです。
ステップ1. New Relicの認証情報シークレットを作成する
ヒント
その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
ステップ2. コレクター設定を含むvalues.yamlを作成する
NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。希望するコレクターイメージを選択してください:
高度な設定オプションについては、次の受信機のドキュメント ページを参照してください。
ステップ3. Helmを使用してOpenTelemetry Collectorをインストールする
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlステップ 4. デプロイメントを検証します:
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50ポート
9404のKafkaブローカーポッドからのスクレイピングが成功したことを示すログが表示されるはずです。
マニフェスト インストレーション メソッドは、 Helmを使用せずにKubernetesリソースを直接制御します。
ステップ1. New Relicの認証情報シークレットを作成する
ヒント
その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
ステップ2. マニフェストファイルを作成する
NRDOTとOpenTelemetryのコレクターはどちらも同一の設定を使用します。コンテナイメージのみが異なります。
高度な設定オプションについては、次の受信機のドキュメント ページを参照してください。
ステップ3。マニフェストをデプロイする
bash$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$$# Apply ConfigMap$kubectl apply -f collector-configmap.yaml$$# Apply Deployment (includes ServiceAccount)$kubectl apply -f collector-deployment.yamlステップ 4. デプロイメントを検証します:
bash$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app=otel-collector --tail=50ポート
9404のKafkaブローカーポッドからのスクレイピングが成功したことを示すログが表示されるはずです。
(オプション) 計装プロデューサーまたは消費者アプリケーション
重要
言語サポート: Javaアプリケーションは、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装を標準でサポートしています。
Kafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、initコンテナでOpenTelemetry Javaエージェントを使用します:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-java-agent image: busybox:latest command: - sh - -c - | wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar -Dotel.service.name=my-kafka-app -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
volumes: - name: otel-auto-instrumentation emptyDir: {}設定パラメーター
次の表では、キー設定について説明します。
| パラメータ | 説明 |
|---|---|
service.name | my-kafka-appプロデューサーまたは消費者アプリケーションの一意の名前に置き換えます。 |
kafka.cluster.name | my-kafka-clusterコレクター設定で使用されているのと同じクラスタ名に置き換えます。 |
otlp.endpoint | エンドポイントhttp://otel-collector.newrelic.svc.cluster.local:4317は、コレクターがnewrelicネームスペースに次のようにデプロイされていると想定しています otel-collector |
Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。
(オプション)Kafkaブローカーログを転送する
Kafkaブローカーのログを収集してNew Relicに送信するには、コレクターの設定にfilelogレシーバーを追加します。
データを検索する
数分後、New Relic に Kafka データが表示されるはずです。New Relic UIのさまざまなビューでKafkaデータを探索するための詳細な手順については、データの検索を参照してください。
以下の表は、各シグナルタイプの保存先をまとめています。以下のすべてのクエリで、my-kafka-clusterをKAFKA_CLUSTER_NAMEの値に置き換えます:
| シグナル | イベントタイプ | 含まれるもの |
|---|---|---|
| 指標 | Metric | ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクス |
| ログ | Log | プロデューサーおよび消費者アプリケーションからのログ(OTel Javaエージェント経由)と、Javaエージェント経由で収集されたブローカーログ |
| トレース | Span | トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパン |
指標
ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクスは、Metricイベントタイプに格納されます:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoログ
OpenTelemetry Javaエージェントでインストゥルメントされたプロデューサーおよび消費者アプリケーションからのログ、およびブローカー上のJavaエージェントを介して収集されたブローカーログは、Logイベントタイプに保存されます:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoトレース
トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパンは、Spanイベントタイプに保存されます:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago例
Kafka StatefulSetマニフェスト、Helm値、OTel Collectorの設定、およびサンプルのプロデューサー/消費者アプリケーションを含む完全な動作例は、New Relic OpenTelemetry Examplesリポジトリで利用可能です。
トラブルシューティング
次のステップ
- Kafka メトリクスを調べる- 完全なメトリクスリファレンスを見る
- カスタムダッシュボードの作成- Kafka データの視覚化を構築します
- アラートの設定 — 消費者ラグやレプリカ不足のパーティションなどの重要なメトリクスをモニターします
関連資料
- セルフホスト型Kafka — セルフホスト型(非Kubernetes)環境向けのKafka監視
- Kubernetes Strimzi — Kubernetes上のStrimzi管理のKafka向けKafka監視
- OpenTelemetry Javaエージェント — OTel Javaエージェントの公式ドキュメント
- Prometheus JMX Exporter - Prometheus形式でJMXメトリクスを公開するJavaエージェント
- Prometheusレシーバー — PrometheusメトリクスエンドポイントをスクレイピングするためのOTel Collectorレシーバー
- kafkametricsレシーバー - 消費者ラグおよびトピックメトリクスレシーバーのドキュメント