• /
  • EnglishEspañolFrançais日本語한국어Português
  • ログイン今すぐ開始

この機械翻訳は、参考として提供されています。

英語版と翻訳版に矛盾がある場合は、英語版が優先されます。詳細については、このページを参照してください。

問題を作成する

OpenTelemetryを使用したKubernetes (Strimzi) 上の Kafka の監視

OpenTelemetry Collectorデプロイして、Strimzi オペレーターを使用してKubernetes上で実行されている Kafka クラスターを監視します。 コレクターは、Kafka ブローカー Pod を自動的に検出し、包括的なメトリクスを収集します。

あなたが始める前に

以下のものを用意してください:

Strimzi Kafka で JMX を有効にする

Kafka クラスターの Strimzi Kafka リソースで JMX が有効になっていることを確認します。

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
jmxOptions: {} # Enables JMX with default settings
# ...other broker configuration

ステップ 1: ネームスペースを作成する

OpenTelemetry Collector 専用のネームスペースを作成します (または既存の Kafka ネームスペースを使用します)。

bash
$
kubectl create namespace kafka

ステップ2: ライセンスキーを使用してシークレットを作成する

New Relic ライセンスキーを Kubernetes シークレットとして保存します。

bash
$
kubectl create secret generic nr-license-key \
>
--from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \
>
-n kafka

YOUR_LICENSE_KEY実際のNew Relicライセンスキーに置き換えます。

ステップ3: OpenTelemetry Collectorをデプロイする

3.1 カスタムコレクターイメージの構築

Java ランタイムと JMX スクレーパーを使用してカスタム OpenTelemetry Collector イメージを作成します。

重要

バージョンの互換性: このガイドでは、JMX Scraper 1.52.0 と OpenTelemetry Collector 0.143.1 を使用します。古いコレクターのバージョンでは、このスクレーパーのハッシュが互換性リストに含まれていない可能性があります。最良の結果を得るには、このガイドに示されている最新バージョンを使用してください。

目標アーキテクチャー: OpenTelemetry Collectorリリースページを参照して、システム アーキテクチャー (例: linux_amd64linux_arm64darwin_amd64) に適したバイナリを見つけてください。 それに応じて Dockerfile 内のTARGETARCH変数を更新します。

Dockerfileとして保存:

# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver
# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector Binary
ARG OTEL_VERSION=0.143.1
ARG TARGETARCH=linux_amd64
ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcol
RUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)
ARG JMX_SCRAPER_JAR_VERSION=1.52.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtime
FROM openjdk:17-jre-slim
COPY --from=prep /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
COPY --from=prep /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888
ENTRYPOINT ["/otelcol-contrib"]
CMD ["--config", "/conf/otel-agent-config.yaml"]

イメージをビルドしてプッシュします。

bash
$
docker build -t your-registry/otel-collector-kafka:latest .
$
docker push your-registry/otel-collector-kafka:latest

3.2 JMX カスタムメトリック ConfigMap の作成

まず、カスタム JMX メトリクス設定を使用して ConfigMap を作成します。 jmx-kafka-config.yamlとして保存:

apiVersion: v1
kind: ConfigMap
metadata:
name: jmx-kafka-config
namespace: kafka
data:
jmx-kafka-config.yaml: |
---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

ヒント

メトリクス コレクションをカスタマイズする: カスタム MBean ルールをkafka-jmx-config.yamlファイルに追加することで、追加の Kafka メトリクスをスクレイピングできます。

JMX ConfigMap を適用します。

bash
$
kubectl apply -f jmx-kafka-config.yaml

3.3 コレクター構成マップの作成

OpenTelemetry Collector設定でConfigMapを作成します。 otel-kafka-config.yamlとして保存:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
namespace: kafka
labels:
app: otel-collector
data:
otel-collector-config.yaml: |
receivers:
# Kafka cluster-level metrics (runs once per OTEL collector)
kafkametrics/cluster:
brokers:
- "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# Receiver creator for dynamic per-broker JMX receivers
receiver_creator:
watch_observers: [k8s_observer]
receivers:
# JMX receiver template (created per discovered broker pod)
jmx:
rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka"
config:
endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi'
jar_path: /opt/opentelemetry-jmx-scraper.jar
target_system: kafka
jmx_configs: /conf-jmx/jmx-kafka-config.yaml
collection_interval: 30s
# Set dynamic resource attributes from discovered pod
resource_attributes:
broker.endpoint: '`endpoint`'
exporters:
otlp:
endpoint: https://otlp.nr-data.net:4317
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
headers:
api-key: ${NEW_RELIC_LICENSE_KEY}
processors:
# Batch processor for efficiency
batch/aggregation:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Detect system resources
resourcedetection:
detectors: [env, docker, system]
timeout: 5s
override: false
# Add Kafka cluster metadata
resource/kafka_metadata:
attributes:
- key: kafka.cluster.name
value: my-cluster
action: upsert
# Extract Kubernetes attributes
k8sattributes:
auth_type: serviceAccount
passthrough: false
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.namespace.name
- k8s.node.name
labels:
- tag_name: strimzi.cluster
key: strimzi.io/cluster
from: pod
- tag_name: strimzi.kind
key: strimzi.io/kind
from: pod
# Transform metrics for New Relic UI
transform:
metric_statements:
- context: metric
statements:
# Clean up descriptions and units
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
# Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit)
- set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
- delete_key(attributes, "broker.endpoint")
- delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
# Filter to include only cluster-level metrics
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic
cumulativetodelta:
extensions:
# K8s observer extension
k8s_observer:
auth_type: serviceAccount
observe_pods: true
observe_nodes: false
service:
extensions: [k8s_observer]
pipelines:
# Per-broker metrics pipeline (with broker.id)
metrics/broker:
receivers:
- receiver_creator
- kafkametrics/cluster
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/exclude_cluster_metrics
- transform
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated)
metrics/cluster:
receivers:
- receiver_creator
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]

設定メモ:

  • my-cluster-kafka-bootstrap Strimzi Kafka サービス名に置き換えます
  • rulekafka.cluster.namemy-clusterクラスタ名に置き換えます
  • ネームスペースが異なる場合はネームスペースを更新します kafka
  • OTLP エンドポイント: https://otlp.nr-data.net:4317 (米国リージョン) またはhttps://otlp.eu01.nr-data.net:4317 (EU リージョン) を使用します。他の地域のOTLP エンドポイントの構成を参照してください
  • receiver_creatorは Strimzi ラベルを使用して Kafka ブローカー ポッドを自動的に検出します

ConfigMap を適用します。

bash
$
kubectl apply -f otel-kafka-config.yaml

3.4 コレクターをデプロイする

デプロイメントを作成します。otel-collector-deployment.yamlとして保存:

apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
serviceAccountName: otel-collector
containers:
- name: otel-collector
image: your-registry/otel-collector-kafka:latest
env:
- name: NEW_RELIC_LICENSE_KEY
valueFrom:
secretKeyRef:
name: nr-license-key
key: NEW_RELIC_LICENSE_KEY
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
volumeMounts:
- name: vol-kafka-test-cluster
mountPath: /conf
- name: jmx-config
mountPath: /conf-jmx
ports:
- containerPort: 4317 # OTLP gRPC
- containerPort: 4318 # OTLP HTTP
- containerPort: 8888 # Metrics
volumes:
- name: vol-kafka-test-cluster
configMap:
name: otel-collector-config
items:
- key: otel-collector-config.yaml
path: otel-agent-config.yaml
- name: jmx-config
configMap:
name: jmx-kafka-config
items:
- key: jmx-kafka-config.yaml
path: jmx-kafka-config.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: otel-collector
namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otel-collector
rules:
- apiGroups: [""]
resources: ["pods", "nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: otel-collector
subjects:
- kind: ServiceAccount
name: otel-collector
namespace: kafka
roleRef:
kind: ClusterRole
name: otel-collector
apiGroup: rbac.authorization.k8s.io

リソース設定:

  • 上記のリソース制限は、中規模の Kafka クラスター (5~10 ブローカー、20~100 トピック) に適しています。

デプロイメントを適用します。

bash
$
kubectl apply -f otel-collector-deployment.yaml

コレクターが実行中であることを確認します。

bash
$
kubectl get pods -n kafka -l app=otel-collector
$
kubectl logs -n kafka -l app=otel-collector -f

ステップ 4: (オプション) 計装プロデューサーまたは消費者アプリケーション

Kubernetesで実行されている Kafka プロデューサーおよび消費者アプリケーションからアプリケーション レベルのテレメトリーを収集するには、 OpenTelemetry Javaエージェントを使用してそれらを計装します。

Kafka アプリケーションを計装する

Kafka プロデューサーまたは消費者アプリケーションを計測するには、 OpenTelemetry Javaエージェントを既存のデプロイメントに追加します。

  1. Javaエージェントをダウンロードします。init コンテナを追加してエージェント JAR をダウンロードします。

    initContainers:
    - name: download-otel-agent
    image: busybox:latest
    command:
    - sh
    - -c
    - |
    wget -O /otel/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  2. Javaエージェントを構成します: 環境変数をアプリケーション コンテナーに追加します。

    env:
    - name: JAVA_TOOL_OPTIONS
    value: >-
    -javaagent:/otel/opentelemetry-javaagent.jar
    -Dotel.service.name="kafka-producer"
    -Dotel.resource.attributes="kafka.cluster.name=my-cluster"
    -Dotel.exporter.otlp.endpoint="http://localhost:4317"
    -Dotel.exporter.otlp.protocol="grpc"
    -Dotel.metrics.exporter="otlp"
    -Dotel.traces.exporter="otlp"
    -Dotel.logs.exporter="otlp"
    -Dotel.instrumentation.kafka.experimental-span-attributes="true"
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true"
    -Dotel.instrumentation.kafka.enabled="true"
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  3. ボリュームを追加します: ボリューム定義を含めます:

    volumes:
    - name: otel-agent
    emptyDir: {}

交換する:

  • kafka-producer アプリケーションに固有の名前を付ける
  • my-cluster あなたのKafkaクラスタ名で

ヒント

上記の設定は、localhost:4317 で実行されているOpenTelemetry Collectorにテレメトリーを送信します。 この設定を使用して独自のコレクターを展開します。

receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: "${NEW_RELIC_LICENSE_KEY}"
compression: gzip
timeout: 30s
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp/newrelic]
metrics:
receivers: [otlp]
exporters: [otlp/newrelic]
logs:
receivers: [otlp]
exporters: [otlp/newrelic]

これにより、処理をカスタマイズしたり、フィルターを追加したり、複数のバックエンドにルーティングしたりできるようになります。その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。

Javaエージェントは、コード変更なしですぐに使用できる Kafka 計装を提供し、以下をキャプチャします。

  • リクエストのレイテンシ
  • スループット メトリクス
  • エラー率
  • 分散型トレース

高度な設定については、 Kafka 計装ドキュメントを参照してください。

ステップ5: (オプション) Kafkaブローカーログを転送する

Kubernetesから Kafka ブローカーのログを収集し、 New Relicに送信するには、 OpenTelemetry Collectorでファイル ログ レシーバーを設定します。

データを検索する

数分後、Kafka メトリクスがNew Relicに表示されるはずです。 New Relic UI のさまざまなビューで Kafka メトリクスを探索する詳細な手順については、 「データの検索」を参照してください。

NRQL を使用してデータをクエリすることもできます。

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

トラブルシューティング

次のステップ

Copyright © 2026 New Relic株式会社。

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.