OpenTelemetry Collector Linux ホストに直接インストールして、セルフホスト型Apache Kafka クラスターを監視します。
あなたが始める前に
以下のものを用意してください:
監視ホストにOpenJDKがインストールされている
Kafka ブローカーで JMX が有効になっている (通常はポート 9999)
コレクターから Kafka ブローカーへのネットワーク アクセス:
- Bootstrapサーバポート(通常は9092)
- JMX ポート (通常は 9999)
ステップ1: OpenTelemetry Collectorをインストールする
OpenTelemetry Collectorリリースから、ホスト OS 用のOpenTelemetry Collector Contrib バイナリをダウンロードしてインストールします。
ステップ2: JMXスクレーパーをダウンロードする
JMX スクレーパーは、Kafka ブローカー MBean から詳細なメトリクスを収集します。
$# Create directory in user home (no sudo needed)$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \> https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar重要
バージョンの互換性: このガイドでは JMX Scraper 1.52.0 を使用します。古いバージョンの OpenTelemetry Collector では、このスクレーパーのハッシュが互換性リストに含まれていない可能性があります。最良の結果を得るには、この JMX スクレーパー バージョンのサポートが含まれる最新の OpenTelemetry Collector バージョンを使用してください。
ステップ 3: JMX カスタムメトリック設定を作成する
カスタム JMX 設定ファイルを作成して、デフォルトのターゲット システムに含まれていない追加の Kafka メトリクスを収集します。
次の設定でファイル~/opentelemetry/kafka-jmx-config.yamlを作成します。
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)ヒント
メトリクス コレクションをカスタマイズする: カスタム MBean ルールをkafka-jmx-config.yamlファイルに追加することで、追加の Kafka メトリクスをスクレイピングできます。
JMX メトリクス ルールの基本構文を学習します
利用可能なMBean名はKafka監視ドキュメントで確認してください。
これにより、特定の監視ニーズに基づいて、Kafka ブローカーによって公開された JMX メトリクスを収集できるようになります。
ステップ 4: コレクター設定を作成する
~/opentelemetry/config.yamlにメインのOpenTelemetry Collector設定を作成します。
receivers: # Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: - ${env:KAFKA_BROKER_ADDRESS} protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# JMX receiver for broker-specific metrics jmx/kafka_broker-1: jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS} target_system: kafka collection_interval: 30s jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml resource_attributes: broker.id: "1" broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: metrics/brokers-cluster-topics: receivers: [jmx/kafka_broker-1, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation] exporters: [otlp/newrelic]
metrics/jmx-cluster: receivers: [jmx/kafka_broker-1] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]設定メモ:
- OTLP エンドポイント:
https://otlp.nr-data.net:4317(米国リージョン) またはhttps://otlp.eu01.nr-data.net:4317(EU リージョン) を使用します。他の地域のOTLP エンドポイントの構成を参照してください
重要
複数のブローカーの場合は、異なるエンドポイントとブローカー ID を持つ追加の JMX レシーバーを追加して、クラスタ内の各ブローカーを監視します。
ステップ5: 環境変数を設定する
必要な環境変数を設定します。
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BROKER_ADDRESS="localhost:9092"$export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"交換する:
YOUR_LICENSE_KEYNew Relicライセンスキーを使用してmy-kafka-clusterKafka クラスターの一意の名前を付けるlocalhost:9092Kafka ブートストラップ サーバー アドレスを使用してlocalhost:9999Kafka ブローカー JMX エンドポイントを使用して
ステップ6: コレクターを起動する
コレクターを直接実行します (sudo は不要です)。
$# Start the collector with your config$otelcol-contrib --config ~/opentelemetry/config.yamlコレクターは、数分以内に Kafka メトリクスのNew Relicへの送信を開始します。
永続実行用の systemd サービスを作成します (1 回限りのセットアップには sudo が必要です)。
$# Create systemd service file$sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF$[Unit]$Description=OpenTelemetry Collector for Kafka$After=network.target$
$[Service]$Type=simple$User=$USER$WorkingDirectory=$HOME/opentelemetry$ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml$Restart=on-failure$Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"$Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"$Environment="KAFKA_BROKER_ADDRESS=localhost:9092"$Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"$
$[Install]$WantedBy=multi-user.target$EOFYOUR_LICENSE_KEYとその他の値を置き換えて、サービスを有効にして開始します。
$sudo systemctl daemon-reload$sudo systemctl enable otelcol-contrib$sudo systemctl start otelcol-contrib$sudo systemctl status otelcol-contribステップ 7: (オプション) 計装プロデューサーまたは消費者アプリケーション
Kafka プロデューサおよび消費者アプリケーションからアプリケーション レベルのテレメトリーを収集するには、 OpenTelemetry Javaエージェントを使用します。
Javaエージェントをダウンロードします。
bash$mkdir -p ~/otel-java$curl -L -o ~/otel-java/opentelemetry-javaagent.jar \>https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarエージェントを使用してアプリケーションを開始します。
bash$java \>-javaagent:~/otel-java/opentelemetry-javaagent.jar \>-Dotel.service.name="kafka-producer-1" \>-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \>-Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \>-Dotel.exporter.otlp.protocol="grpc" \>-Dotel.metrics.exporter="otlp" \>-Dotel.traces.exporter="otlp" \>-Dotel.logs.exporter="otlp" \>-Dotel.instrumentation.kafka.experimental-span-attributes="true" \>-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \>-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \>-Dotel.instrumentation.kafka.enabled="true" \>-jar your-kafka-application.jar
交換する:
kafka-producer-1プロデューサーまたは消費者アプリケーションの一意の名前を付けるmy-kafka-clusterコレクター設定で使用されているのと同じクラスタ名を持つhttps://otlp.nr-data.net:4317New Relic OTLP エンドポイントを使用します (EU 地域ではhttps://otlp.eu01.nr-data.net:4317を使用します)。その他のエンドポイントおよび設定オプションについては、 「OTLP エンドポイントの構成」を参照してください。
Javaエージェントは、コード変更なしですぐに使用できる Kafka 計装を提供し、以下をキャプチャします。
- リクエストのレイテンシ
- スループット メトリクス
- エラー率
- 分散型トレース
高度な設定については、 Kafka 計装ドキュメントを参照してください。
ステップ6: (オプション) Kafkaブローカーログを転送する
ホストから Kafka ブローカー ログを収集してNew Relicに送信するには、 OpenTelemetry Collectorでファイル ログ レシーバーを構成します。
データを検索する
数分後、Kafka メトリクスがNew Relicに表示されるはずです。 New Relic UI のさまざまなビューで Kafka メトリクスを探索する詳細な手順については、 「データの検索」を参照してください。
NRQL を使用してデータをクエリすることもできます。
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'トラブルシューティング
次のステップ
- Kafka メトリクスを調べる- 完全なメトリクスリファレンスを見る
- カスタムダッシュボードの作成- Kafka データの視覚化を構築します
- アラートのセットアップ- 消費者のラグやレプリケーションが不十分なパーティションなどの重要なメトリクスを監視します