• /
  • EnglishEspañolFrançais日本語한국어Português
  • ログイン今すぐ開始

この機械翻訳は、参考として提供されています。

英語版と翻訳版に矛盾がある場合は、英語版が優先されます。詳細については、このページを参照してください。

問題を作成する

OpenTelemetryを使用した自己ホスト型 Kafka の監視

OpenTelemetry Collector Linux ホストに直接インストールして、セルフホスト型Apache Kafka クラスターを監視します。

あなたが始める前に

以下のものを用意してください:

  • New Relicアカウント

  • 監視ホストにOpenJDKがインストールされている

  • Kafka ブローカーで JMX が有効になっている (通常はポート 9999)

  • コレクターから Kafka ブローカーへのネットワーク アクセス:

    • Bootstrapサーバポート(通常は9092)
    • JMX ポート (通常は 9999)

ステップ1: OpenTelemetry Collectorをインストールする

OpenTelemetry Collectorリリースから、ホスト OS 用のOpenTelemetry Collector Contrib バイナリをダウンロードしてインストールします。

ステップ2: JMXスクレーパーをダウンロードする

JMX スクレーパーは、Kafka ブローカー MBean から詳細なメトリクスを収集します。

bash
$
# Create directory in user home (no sudo needed)
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \
>
https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar

重要

バージョンの互換性: このガイドでは JMX Scraper 1.52.0 を使用します。古いバージョンの OpenTelemetry Collector では、このスクレーパーのハッシュが互換性リストに含まれていない可能性があります。最良の結果を得るには、この JMX スクレーパー バージョンのサポートが含まれる最新の OpenTelemetry Collector バージョンを使用してください。

ステップ 3: JMX カスタムメトリック設定を作成する

カスタム JMX 設定ファイルを作成して、デフォルトのターゲット システムに含まれていない追加の Kafka メトリクスを収集します。

次の設定でファイル~/opentelemetry/kafka-jmx-config.yamlを作成します。

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

ヒント

メトリクス コレクションをカスタマイズする: カスタム MBean ルールをkafka-jmx-config.yamlファイルに追加することで、追加の Kafka メトリクスをスクレイピングできます。

ステップ 4: コレクター設定を作成する

~/opentelemetry/config.yamlにメインのOpenTelemetry Collector設定を作成します。

receivers:
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers:
- ${env:KAFKA_BROKER_ADDRESS}
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
topic_match: ".*"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# JMX receiver for broker-specific metrics
jmx/kafka_broker-1:
jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar
endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
target_system: kafka
collection_interval: 30s
jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml
resource_attributes:
broker.id: "1"
broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
metrics/brokers-cluster-topics:
receivers: [jmx/kafka_broker-1, kafkametrics]
processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation]
exporters: [otlp/newrelic]
metrics/jmx-cluster:
receivers: [jmx/kafka_broker-1]
processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation]
exporters: [otlp/newrelic]

設定メモ:

  • OTLP エンドポイント: https://otlp.nr-data.net:4317 (米国リージョン) またはhttps://otlp.eu01.nr-data.net:4317 (EU リージョン) を使用します。他の地域のOTLP エンドポイントの構成を参照してください

重要

複数のブローカーの場合は、異なるエンドポイントとブローカー ID を持つ追加の JMX レシーバーを追加して、クラスタ内の各ブローカーを監視します。

ステップ5: 環境変数を設定する

必要な環境変数を設定します。

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BROKER_ADDRESS="localhost:9092"
$
export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"

交換する:

  • YOUR_LICENSE_KEY New Relicライセンスキーを使用して
  • my-kafka-cluster Kafka クラスターの一意の名前を付ける
  • localhost:9092 Kafka ブートストラップ サーバー アドレスを使用して
  • localhost:9999 Kafka ブローカー JMX エンドポイントを使用して

ステップ6: コレクターを起動する

コレクターを直接実行します (sudo は不要です)。

bash
$
# Start the collector with your config
$
otelcol-contrib --config ~/opentelemetry/config.yaml

コレクターは、数分以内に Kafka メトリクスのNew Relicへの送信を開始します。

永続実行用の systemd サービスを作成します (1 回限りのセットアップには sudo が必要です)。

bash
$
# Create systemd service file
$
sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF
$
[Unit]
$
Description=OpenTelemetry Collector for Kafka
$
After=network.target
$
$
[Service]
$
Type=simple
$
User=$USER
$
WorkingDirectory=$HOME/opentelemetry
$
ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml
$
Restart=on-failure
$
Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"
$
Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"
$
Environment="KAFKA_BROKER_ADDRESS=localhost:9092"
$
Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"
$
$
[Install]
$
WantedBy=multi-user.target
$
EOF

YOUR_LICENSE_KEYとその他の値を置き換えて、サービスを有効にして開始します。

bash
$
sudo systemctl daemon-reload
$
sudo systemctl enable otelcol-contrib
$
sudo systemctl start otelcol-contrib
$
sudo systemctl status otelcol-contrib

ステップ 7: (オプション) 計装プロデューサーまたは消費者アプリケーション

Kafka プロデューサおよび消費者アプリケーションからアプリケーション レベルのテレメトリーを収集するには、 OpenTelemetry Javaエージェントを使用します。

  1. Javaエージェントをダウンロードします。

    bash
    $
    mkdir -p ~/otel-java
    $
    curl -L -o ~/otel-java/opentelemetry-javaagent.jar \
    >
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
  2. エージェントを使用してアプリケーションを開始します。

    bash
    $
    java \
    >
    -javaagent:~/otel-java/opentelemetry-javaagent.jar \
    >
    -Dotel.service.name="kafka-producer-1" \
    >
    -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
    >
    -Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \
    >
    -Dotel.exporter.otlp.protocol="grpc" \
    >
    -Dotel.metrics.exporter="otlp" \
    >
    -Dotel.traces.exporter="otlp" \
    >
    -Dotel.logs.exporter="otlp" \
    >
    -Dotel.instrumentation.kafka.experimental-span-attributes="true" \
    >
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
    >
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
    >
    -Dotel.instrumentation.kafka.enabled="true" \
    >
    -jar your-kafka-application.jar

交換する:

  • kafka-producer-1 プロデューサーまたは消費者アプリケーションの一意の名前を付ける
  • my-kafka-cluster コレクター設定で使用されているのと同じクラスタ名を持つ
  • https://otlp.nr-data.net:4317 New Relic OTLP エンドポイントを使用します (EU 地域ではhttps://otlp.eu01.nr-data.net:4317を使用します)。その他のエンドポイントおよび設定オプションについては、 「OTLP エンドポイントの構成」を参照してください。

Javaエージェントは、コード変更なしですぐに使用できる Kafka 計装を提供し、以下をキャプチャします。

  • リクエストのレイテンシ
  • スループット メトリクス
  • エラー率
  • 分散型トレース

高度な設定については、 Kafka 計装ドキュメントを参照してください。

ステップ6: (オプション) Kafkaブローカーログを転送する

ホストから Kafka ブローカー ログを収集してNew Relicに送信するには、 OpenTelemetry Collectorでファイル ログ レシーバーを構成します。

データを検索する

数分後、Kafka メトリクスがNew Relicに表示されるはずです。 New Relic UI のさまざまなビューで Kafka メトリクスを探索する詳細な手順については、 「データの検索」を参照してください。

NRQL を使用してデータをクエリすることもできます。

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

トラブルシューティング

次のステップ

Copyright © 2026 New Relic株式会社。

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.