この記事は、Fluentd Advent Calendar 2015 - Qiita の七日目の記事です。
先日、komamitsu/fluency · GitHub というFluentd (in_forward) 用のJava loggerを作ってみました。
元々、fluent/fluent-logger-java · GitHub の開発を見ていたのですが、Fluentd MLで色々と要望があって機能を拡張したい欲が高まったものの既存の実装的に面倒臭そうな感じだったので、ついカッとなってゼロから書いてしまいました。
特徴
特徴としては以下のものがあります。
- 通常 (fluent/fluent-logger-java · GitHub) より三倍速い
- fluency_benchmark.md · GitHub 参照 (後述するPackedForwardを利用した場合)
- Fluentdへの送信において同期 / 非同期のモードを選択できる
- 複数のFluentdを指定しfailoverすることが可能
- Fluentd <-> Fluency間のheartbeatにおいてTCP / UDPの利用を選択できる
- Fluentdへの送信データフォーマットでMessage / PackedForwardを選択できる
- 僕が二年程前に当時存在していた各言語のfluent loggerをざっと見た感じ、全てのloggerで実装が容易なMessageフォーマット ([tag, time, event] という単位で1イベント毎に送信) を用いていた気がします
- Fluentdのout_forwardではPackedForwardフォーマット ([tag, MessagePacked chunk([[time, event], [time, event]], ...])] のようにtag毎に複数のイベントを送信) を使っており、送信側からまとめて送れる。また、Fluentd側ではMessagePackedなchunkをそのままemitできるので性能・CPU負荷的に有利
- FluentdからのACKを待ちat least onceを実現するモードを有効化 / 無効化できる
- イベント送信の度にFluentdからのACKを待つので、例えばFluentd側でbuffer fullな場合、それの解消を待つことによりイベントの取り損ね (送り損ね?) を防ぐことができる
以降では内部の作りについて簡単に説明します。
バッファ管理とFluentdへの送信
既存の fluent/fluent-logger-java · GitHub だとイベント追加時に以下を行っています。
- MessagePackシリアライズされたイベントのデータをメモリ上のバッファにMessageフォーマット ([tag, time, event]) として追加
- Fluentdのin_forwardに対し、バッファ上のデータをTCPで送信
- 成功したらバッファクリア、失敗したら次回イベント追加時にまとめて送信
ただ、この方法だとバッファリングされた複数のイベントを送信する際、ACKレスポンス対応が難しい (各イベント毎にACKを待つためバッファ上のMessagePackシリアライズされたデータをデシリアライズして個々のイベントを取得する必要がある) という問題がありました。また、PackedForwardフォーマットには簡単に対応できません。
そこでFluencyではPackedForwardフォーマットの場合、イベント追加時に以下のようにしています (Messageフォーマットはこれより単純なのでここではPackedForwardを例に...)。
- tag毎にバッファを保持しているMap<String, Buffer>に、既に当該tag用のバッファが存在している場合
- かつ、そのバッファにMessagePackシリアライズされたイベントのデータを書き込む空き容量がある場合は、そのバッファに追記
- バッファに書き込む空き容量がない場合、指定されている割合でバッファを拡張して追記
- そもそも当該tag用のバッファが存在しない場合、初期サイズでバッファを割り当てて、そこに書き込む
- 一定サイズまで拡張されているバッファ、もしくは一定時間フラッシュされていないバッファを送信用のキューに移動 (Map
からはバッファは取り除かれる)
また、自前でバッファプールを持っており、上記処理においてバッファの取得の際はこのプールから取得することでバッファを再利用しています (バッファサイズ拡張時の旧バッファおよびFluentdへのデータ送信完了済みバッファがバッファプールに返却される) 。
Fluentdへのイベントデータ送信時はPackedForwardフォーマットの場合、前述した送信用のキューからtag付きのMessagePackedなchunk ([tag, MessagePacked chunk([[time, event], [time, event]], ...])]) を取り出して送信します。複数のイベントを一括で送信できる上、Fluentd側では受け取ったchunkをそのままemitできるので速度 / CPU負荷ともに効率的です。
同期送信モードの場合はイベント追加APIを呼び出したスレッドで、非同期送信モードの場合は専用スレッドにて上記のイベントデータ送信処理を行います。後者の場合、送信処理で詰まってもイベント追加API呼び出しではブロックされません。
ちなみに、Messageフォーマット ([tag, time, event]) の場合は各イベント毎に送信キューに入れています。PackedForwardに比べると、こちらはシンプルですので詳細は省略します。
Fluentdの死活監視
Fluentdのin_forwardに対しては、UDPまたはTCPにてheartbeatを投げ死活監視を行っています。
heartbeatの失敗 / 遅延に対する判断ロジックについては、元々Fluentdのout_forward <-> in_forward間で ϕ Accrual Failure Detector アルゴリズムが用いられていたため、これをJavaで実装した komamitsu/phi-accural-failure-detector · GitHub を使って実現しています。とはいえ、Fluentdでこのアルゴリズムを使うのはやり過ぎな感もあるため、今後よりシンプルなロジックに差し代わる可能性もありそうです。
また、heartbeatにより利用不可と判断された場合はfailoverとして、secondaryのFluentdを利用します。ただし、利用不可と判断されたprimaryのFluentdが復帰し一定時間経った場合は、再度そちらを利用します。
ACKレスポンス対応
前述したバッファ管理にて説明した通り、送信データ単位 (Messageフォーマットの場合: [tag, time, event], PackedForwardフォーマットの場合: [tag, MessagePacked chunk([[time, event], [time, event]], ...])]) で送信キューに入れていることにより、ACKレスポンスの対応が容易になっています。
具体的にはACKレスポンスモードの場合、各データの末尾にUUIDを付与しFluentdからのレスポンスに同値が含まれていることを確認するまでブロッキング (次のデータを送信しない) することでat least onceを実現しています。ACKを待つために性能への影響は当然0ではありませんが、大量のイベントをまとめて送信するPackedForwardフォーマットとの併用により、性能にそれほど影響を与えずかつat least onceを実現できるかと思います。
活用事例
Fluencyは既に大量のデータ送信が想定されている treasure-data/kafka-fluentd-consumer · GitHub に組み込まれたりしているようです。気軽に使える
fluent/fluent-logger-java · GitHub、タフな環境でも使える komamitsu/fluency · GitHub という使い分けは良さそうな気がするので、Fluentdのfailoverが必要であったり、大量のデータ送信が行われるような環境であれば是非お試し頂いてフィードバックを頂けたら幸いです。
今後の予定
リトライ機能や複数のFluentdを指定しfailoverすることが可能であるため、それ以外のエラーハンドリングには現状力を入れていませんでした。エラー通知用のコールバック等、機能追加していこうかと思います。また、ファイルバッファリングを実装しさらに冗長性を高めるのもありかなぁ...等々