理系学生日記

おまえはいつまで学生気分なのか

OpenTelemetry CollectorでClaude Codeメトリクスに対するDeltaToCumulative処理をスケールアウトさせる

以前、Claude CodeのOpenTelemetry Metricsを収集してGrafanaで可視化する仕組みを構築したという記事を書きました。

あの記事では「とりあえず動いたぜ!」という勢いで締めくくったわけですが、実はあの構成には根本的な問題がありました。そう、僕は問題を先送りにしていたのです。技術ブログでよくあるやつですね。

それは、DeltaToCumulativeというプロセッサのステートフル性が引き起こすスケーラビリティの課題です。今回はその問題と、我々がどう立ち向かったか、つまり2段構成のOpenTelemetry Collectorによる解決策を語ります。

何が問題だったのか

問題を理解するには、まずClaude CodeがどうMetricsを送信し、Amazon Managed Service for Prometheus(以下AMP)が何を期待しているかを知る必要があります。

Claude Codeは、OpenTelemetry MetricsをDelta Temporalityで送信します。Delta Temporalityというのは、「前回のエクスポート以降に増えた分だけを送る」という方式です。例えば、あるカウンタが前回100だったとして、今回120になっていたら、「20増えました」という情報だけを送ります。差分だけを送るので、通信量も少なく効率的です。

一方、AMPが受け付けるのはCumulative Temporalityです。Cumulativeというのは累積値のことで、「最初からの合計値を毎回送る」という方式です。さっきの例で言えば、毎回「今120です」という絶対値を送ります。Prometheusの世界では、この方式が標準なのです。

つまり、Claude CodeとAMPの間には、DeltaとCumulativeという根本的な不一致があります。こういう不一致、システム統合の現場ではよく見かけますよね。誰も悪くないのに、なぜか噛み合わない。この不一致を解消するのが、OpenTelemetry CollectorのDeltaToCumulativeプロセッサです。

DeltaToCumulativeのステートフル性という呪い

DeltaToCumulativeプロセッサは、その名の通りDeltaをCumulativeに変換します。「20増えました」という情報を受け取ったら、「前回は100だったから、今回は120です」という累積値に変換してくれるわけです。

この変換プロセスを図示するとこうなります。

diagram

しかし、よく考えてみてください。「前回は100だった」という情報を、このプロセッサはどこかに覚えておく必要があります。そう、メモリ内に状態を保持しているのです。

この状態保持が、スケールアウトを極めて困難にします。なぜなら、同じ時系列データ(例えば、特定のセッションのtoken_count)は、必ず同じCollectorインスタンスに送られなければならないからです。

もし時系列Aのデータが、あるときはCollector-1に、別のときはCollector-2に送られたらどうなるでしょうか。Collector-1は「前回100だったから、20増えて120です」と計算し、Collector-2は「このデータ初めて見たな、20です」と計算します。同じ時系列なのに、異なる値が2つのCollectorから送信されることになり、データが壊れます。

メトリクスが信頼できなければ、システムの監視なんて意味がありません。

つまり、DeltaToCumulativeプロセッサを使う限り、時系列ごとにスティッキーなルーティングが必須になります。これはステートフルな処理の宿命であり、スケーラビリティの観点からは呪いそのものです。

「スケールアウトしたいなら状態を持つな」という分散システムの鉄則、聞いたことありますよね。我々は今、その鉄則に真っ向から反する状況にいます。さて、どうしたものか。

2段構成という解決策

この呪いを解くためのアーキテクチャが、OpenTelemetry CollectorのTier 1(Gateway)とTier 2(Worker)の2段構成です。基本的な考え方はこうです。

  1. Tier 1(Gateway): ステートレスなプロキシとして動作し、Load Balancing Exporterを使ってメトリクスをTier 2に振り分ける
  2. Tier 2(Worker): ステートフルな処理(DeltaToCumulative)を実行し、AMPに送信する

ここで重要なのは、Load Balancing Exporterが同じ時系列のデータを「基本的には」同じTier 2のWorkerに送る点です。これにより、各Workerが保持するDeltaToCumulativeの状態が正しく保たれます。

この構成により、ステートフルな処理をTier 2に限定し、Tier 1はステートレスに保つことができます。Tier 1はステートレスなので、ALBによるラウンドロビンで負荷分散が可能です。Tier 2も、各GatewayがConsistent Hashで同じstreamIDを同じWorkerに送るため、スケールアウトできます。つまり、両方のTierが独立してスケールアウト可能な構成になっています。

アーキテクチャを図示するとこうなります。

diagram

そしてこの図を見て「複雑そうだな」と思ったあなた、その直感は正しい。めんどくさい。でも、ステートフルな処理をスケールさせるには、こういう案しか出てこなかった。

Load Balancing Exporterの仕組み

Load Balancing Exporterは、その名前から複雑なロジックを想像するかもしれませんが、実際にはシンプルなConsistent Hash Ringを使っています。

Consistent Hashというのは、キーをハッシュ化して、その値に基づいてN個のノードのどれかに振り分けるアルゴリズムです。同じキーは必ず同じノードに振り分けられるため、ステートフルな処理には理想的です。ノードが増減したときの影響も最小限に抑えられます。

diagram

この図が示すように、Hash Ring上には各Workerがハッシュ値によって配置されています。streamIDがやってきたら、そのハッシュ値を計算します。そして、Ring上で時計回りへ進み、最初に見つかるWorkerへ割り当てます。例えば、Hash値1500のstreamIDは、時計回りで次にあるWorker-2(Hash値2000)に送られます。

今回の構成では、routing_key: streamIDを指定しています。streamIDというのは、OpenTelemetryにおいて時系列を一意に識別するIDです。重要なのは、すべてのGatewayインスタンスが同じHash Ringアルゴリズムと同じWorkerリストを使用する点です。これにより、あるリクエストがGateway 1に届いても、別のリクエストがGateway 2に届いても、同じstreamIDは必ず同じTier 2 Workerに送られることが保証されます。

これにより、DeltaToCumulativeプロセッサが保持する状態の整合性が保たれます。特定のセッションのtoken_countは、常にWorker-1で処理され、前回の累積値を正しく参照できるのです。

AWS Service Discoveryによる動的IP解決

ここで1つ問題があります。Tier 2のWorkerをスケールアウトしたとき、各GatewayのLoad Balancing Exporterはどうやって新しいWorkerのIPアドレスを知るのでしょうか。

答えはAWS Cloud Mapです。Tier 2のWorkerをECS Serviceとして動かし、Cloud MapのService Discoveryに登録します。すべてのGatewayインスタンスは、Cloud MapのDNS名を参照することで、動的に変化するWorkerのIPアドレスリストを取得できます。

Workerが3台から5台に増えても、各GatewayのLoad Balancing Exporterは自動的に新しいIPアドレスを発見し、Consistent Hash Ringに追加します。すべてのGatewayが同じDNS名を参照するため、同じWorkerリストを得られ、Hash Ringの整合性が保たれます。運用者が手動でCollectorの設定を変更する必要はありません。つまり、スケールアウトが自動化されているわけです。

設定例を示すと、こんな感じです。

exporters:
  # Load Balancing Exporter: Worker Collectorへ分散配信
  # Consistent hashingでrouting keyに基づいて同じWorkerにルーティング
  loadbalancing:
    protocol:
      otlp:
        # endpoint は設定不要(resolverが自動的に設定)
        timeout: 1s
        tls:
          insecure: true # VPC内部通信のためTLS不要

    # Service Discovery DNS解決設定
    resolver:
      dns:
        hostname: ${service_discovery_hostname} # Service Discovery DNS名(環境ごとに動的に設定)
        port: "4317" # Worker CollectorのOTLP/gRPCポート
        interval: 30s # DNS解決の更新間隔
        timeout: 5s

    # Routing key設定: streamID でルーティング
    # streamIDは、Resource attributes + Data point attributesの
    # 組み合わせで計算されるため、session.id(Data point attribute)も含まれる。
    # 同じstreamIDを持つメトリクスを常に同じWorker Collectorに送信することで、
    # deltatocumulativeのステート管理の整合性を保つ
    routing_key: "streamID"

Workerの数が変わると、WorkerのDNS Aレコードが返すIPアドレスのリストが変わります。各GatewayのLoad Balancing Exporterは定期的にDNS解決し、それぞれのHash Ringを更新します。すべてのGatewayが同じDNS結果を得るため、Hash Ringの状態も同期されます。

OpenTelemetryメトリクスの構造とstreamID

routing_keyの話をする前に、OpenTelemetryにおけるメトリクスデータの構造を理解しておく必要があります。特に、Resource attributesとData point attributesの違いです。

OpenTelemetryのメトリクスは、2種類の属性(ラベル)を持ちます。

Resource attributesは、サービス全体に共通する属性です。例えば、サービス名、バージョン、ホスト情報などがここに含まれます。Claude Codeの場合、service.nameservice.versionhost.archos.typeなどがResource attributesとして送られてきます。これらは、そのサービスのすべてのメトリクスで共通です。

一方、Data point attributesは、各データポイント固有の属性です。こちらには、セッションID、ユーザーID、組織IDなど、より細かい粒度の情報が含まれます。Claude Codeの場合、session.iduser.idorganization.idなどがData point attributesとして送られてきます。

この違いを図示すると、こうなります。

diagram

重要なのは、streamIDがResource attributesとData point attributesの両方から計算される点です。つまり、同じサービスから送られてきても、session.idが異なれば、異なるstreamIDになります。これにより、セッション単位で時系列が区別されるのです。

Claude Codeが送信するメトリクスでは、セッションを識別するsession.idはData point attributes側に存在しています。もしrouting_keyとしてserviceを選んでいたら、Resource attributesだけを見ることになり、異なるセッションのメトリクスが同じWorkerに送られてしまいます。逆に、streamIDを使えば、Resource attributesとData point attributesの両方を考慮したルーティングができるため、セッション単位で正しく振り分けられます。

routing_keyの選択とトレードオフ

Load Balancing Exporterでは、何をrouting_keyにするかが重要な設計判断です。選択肢としては、traceIDservicestreamIDなどがあります。

今回streamIDを選んだ理由は、streamIDがResource attributesとData point attributes(ラベル)の両方を含めたストリーム識別子だからです。Claude Codeが送信してくるメトリクスでは、セッションを識別するsession.idはData point attributes側に存在しています。もしresourceなどのrouting_keyを選んでいたら、このData point attributes側のsession.idを考慮したルーティングができず、同じセッションのメトリクスが異なるWorkerに分散してしまい、DeltaToCumulativeの状態管理が破綻します。streamIDはこのData point attributesも含めて時系列を一意に識別するため、整合性の観点では最適な選択です。

一方、トレードオフもあります。特定の時系列に負荷が偏っている場合、特定のWorkerだけが過負荷になる可能性があります。例えば、あるユーザーが異常に大量のメトリクスを送信していたら、そのユーザーのstreamIDを処理するWorkerだけが高負荷になります。

まあ、すべての要件を満たすアーキテクチャなんてものは存在しません。すべてはトレードオフです。我々は整合性を優先し、負荷の偏りは監視で検知することにしました。

まとめ

  • Claude CodeはDelta TemporalityでMetricsを送信するが、AMPはCumulativeしか受け付けない
  • DeltaToCumulativeプロセッサは状態を保持するため、同じ時系列は同じCollectorに送る必要がある
  • Tier 1(Gateway)とTier 2(Worker)の2段構成により、ステートフル処理をTier 2に限定できる
  • ALBとECS Serviceを使い、Tier 1はステートレスにスケールアウト、Tier 2はConsistent Hashでステートフルにスケールアウト
  • Load Balancing ExporterのConsistent Hash Ringで、時系列単位のスティッキーなルーティングを実現
  • すべてのGatewayが同じHash Ringアルゴリズムを使うため、どのGatewayを経由しても同じWorkerにルーティングされる
  • AWS Service DiscoveryによりWorkerのIPアドレスを動的に解決し、スケールアウトに対応
  • routing_key: streamIDにより整合性を最優先する設計判断

ステートフルな処理のスケーラビリティは、分散システムにおける古典的な課題です。今回のOpenTelemetryの2段構成は、その課題に対する1つの解答案です。Consistent Hashingという古典的アルゴリズムと、Cloud Mapという現代的なService Discoveryを組み合わせることで、実用的なスケーラビリティを実現できました。

「できた気がします」という表現も考えましたが、実際に動いているので「できました」で良いでしょう。前回みたいに「とりあえず動いた!」で終わらせるのは避けたいところです。問題が見つかる未来は見たくない。まあ、どうせ何か問題は出てくるんでしょうけど。