JVM Tool Interfaceを少しさわってみたメモ

ふと、JVM(TM) Tool Interface 1.2.3 に触れたことがないことに気がつき、少しさわってみたのでメモ。

OpenJDKのソースコードjdk/src/share/demo/jvmti の下に幾つか例があるので、それを試してみることに。ちなみに概要は上記URLのドキュメントの各APIの説明の直前まで眺めれば良さそう。APIの種類が多いので少し圧倒されるけど、実際に何か作る際に目的に合ったものを見れば問題無いかと。

今回見てみたのは versionCheck / mtrace の二つ。

  • versionCheck

Agent_OnLoad()でJVMTI_EVENT_VM_INITイベントにコールバック関数をセットして、その中でjvmtiEnvから引っこ抜いたバージョン番号をstdoutに表示するだけのシンプルなもの

Mac OS X上でのビルドは以下のような感じ。ちなみにagent_util/agent_util.cはjdk/src/share/demo/jvmtiが依存しているヘルパー関数群。

$ pwd
/Users/komamitsu/src/openjdk/jdk/src/share/demo/jvmti/versionCheck
$ clang -I${JAVA_HOME}/include -I${JAVA_HOME}/include/darwin -I../agent_util -c ../agent_util/agent_util.c *.c
$ clang -dynamiclib -L${JAVA_HOME} -o libversionCheck.so *.o

これを利用する場合は以下のように指定する

$ java -agentpath:/Users/komamitsu/src/openjdk/jdk/src/share/demo/jvmti/versionCheck/libversionCheck.so Main
Compile Time JVMTI Version: 1.2.1 (0x30010201)
Run Time JVMTI Version: 1.2.3 (0x30010203)
Hello world
  • mtrace

Javaのメソッド呼び出し回数をトレースするもの。Agent_OnLoad()で色々コールバックを設定するが、面白いのはJVMTI_EVENT_CLASS_FILE_LOAD_HOOKで都度ロードされたクラスファイルの各メソッドを、同梱されているMtrace.javaのメソッドを最初に呼ぶように書き換えている(と思う)。Mtrace.javaのメソッドは当該Cライブラリファイルの関数(この中でメソッド呼び出し回数をカウント)をnative methodとして呼ぶように紐づけられているが、その際、JNIのFindClass()とRegisterNative()を使っている。ちなみにjava_crw_demo/java_crw_demo.c はクラスファイル操作系ヘルパー関数群。

$ pwd
/Users/komamitsu/src/openjdk/jdk/src/share/demo/jvmti/mtrace
$ clang -I${JAVA_HOME}/include -I${JAVA_HOME}/include/darwin -I../agent_util -I../java_crw_demo -c ../agent_util/agent_util.c ../java_crw_demo/java_crw_demo.c *.c
$ clang -dynamiclib -L${JAVA_HOME} -o libmtrace.so *.o
$ mkdir -p classes
$ javac -d classes Mtrace.java
$ jar cf mtrace.jar *

使い方で少しハマった点としては、FindClass()でMtrace classを探す際、ロードされる前に探しに行くとJVMがクラッシュしてしまう(FindClass()の戻り値がNULLになることを期待してたけど...)。なので -Xbootclasspath/a でmtrace.jarを先にロードする必要あり

$ java -Xbootclasspath/a:/Users/komamitsu/src/openjdk/jdk/src/share/demo/jvmti/mtrace/classes/mtrace.jar -agentpath:/Users/komamitsu/src/openjdk/jdk/src/share/demo/jvmti/mtrace/libmtrace.so  Main
    :
Class java/lang/String 8084 calls
        Method charAt (I)C 3941 calls 3941 returns
        Method length ()I 832 calls 832 returns
        Method equals (Ljava/lang/Object;)Z 543 calls 543 returns
    :

なお、途中で、調査のため-Xcheck:jni -verboseを有効にしたら捗った。特に-verbose指定時に以下のエラーが出たのでJDKのバージョン違いに気がつけた(途中JDK8 -> 7にして試してた)

[Loaded java.lang.ClassFormatError from /Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/rt.jar]
[Loaded java.lang.UnsupportedClassVersionError from /Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/rt.jar]

Rustを使ってPostgreSQLの拡張ライブラリ (FDW) を書いてみた話

というタイトルですが、具体的には "Rustで拡張ライブラリ全体を書いた" のではなく "Cで拡張ライブラリの本体を書いてその中からRustのライブラリを呼んで、非同期でRust側からの結果を取得" してます。

ちなみに、Foreign Data Wrapper (FDW) とは、PostgreSQL管理外のデータソースにIOできるPostgreSQLのテーブルの一種です。今回の場合 "Treasure DataというSaaSに裏でクエリーを投げて結果を取得するFDW" となります。

github.com

なぜこんなことをやろうかと思ったか?

という背景がありました。

何が必要?

  1. Treasure DataのAPI経由でSQLを投げて結果を取得できるRust製のクライアントライブラリ
  2. Cで書かれたFDW本体。このFDWに対応したPostgreSQLのテーブルに対してSQLが発行されると、PostgreSQLに登録してあるFDWの関数が適宜コールバックされる(実行計画に必要な情報提供、クエリ開始時、クエリ結果一行読み込み、など)
  3. 上記1と2をつなぐ何か

1 は普通にREST APIのクライアントライブラリを書けば良いので、すでに書いてあり (GitHub - komamitsu/td-client-rust: Rust Client Library for Treasure Data) crates.ioにもpush済み (https://crates.io/crates/td-client)
2 は既存のPostgreSQL FDW等をベースすれば難しくなさそう。Treasure DataはPrestoとHiveをサポートしているので、PostgreSQLの構文との差異を適当に吸収する必要はあり ( "~~" => "LIKE" 等)

この記事では3について説明をしたいと思います。ちなみに3はソースコード的には2と同じプロジェクトに含まれます。

CとRustの連携部分

今回のケースでは、以下のような連携が必要となります

  1. 外部データソースのスキャン開始時に、td-client-rustを用いてTreasure DataにSQLを投げて処理が終わりクエリ結果が生成されるのを待つ。クエリ発行時の初回のみ
  2. 生成されたクエリ結果を1レコードずつ読みRust側からC側に返す。これはPostgreSQL側から要求がある度、通常複数回処理が発生 (LIMIT句で限定されているか、最後までクエリ結果を読みきるまで)

実際のコードではFDWとtd-client-rustの間に橋渡し的なCとRustの関数が入っています。

以降は簡単のためこれらを bridge(C)、bridge(Rust) と呼びます。

全体の流れを図にすると
f:id:komamitsu:20161212004619j:plain
f:id:komamitsu:20161212004628j:plain
このようになります。

クエリ結果の非同期取得

Treasure Dataから返ってきたSQLクエリ結果をPostgreSQLに返すタイミングは、PostgreSQLからの非同期なFDW関数呼び出しとなるため少し工夫が要ります。今回は...

  • bridge(Rust)内で、Treasure Dataのクエリ処理終了を確認後、std::sync::mpsc::channel() でstd::sync::mpsc::SenderとReceiverの組を生成
    let (tx, rx) = mpsc::channel();
  • さらにスレッドを生成し、その中で td-client-rustのtd::client::Client::each_row_in_job_result() を呼び出す。この関数にはクエリ結果1レコードごとに呼ばれるclosureを渡せるので、そのFnの中でレコードをSender::send() でキューに追加するようにする。全レコード取得後、門番的な意味合いのレコードをキュー末尾に入れておく
    thread::spawn(move || {
        match client.each_row_in_job_result(
            job_id,
            &|xs: Vec<Value>| match tx.send(Some(xs)) {
                Ok(()) => (),
                Err(err) => {
                    log!(error_log, "Failed to pass results. job_id={:?}, error={:?}",
                             job_id, err)
                    // TODO: How to propagate this error....?
                }
            }
        ) {
            Ok(()) => match tx.send(None) {
                Ok(()) => (),
                Err(err) => {
                    log!(error_log, "Failed put sentinel in the queue. job_id={:?}, error={:?}",
                             job_id, err)
                }
            },
            Err(err) => {
                log!(error_log, "Failed to fetch result. job_id={:?}, error={:?}",
                         job_id, err)
            }
        }
    });
  • Receiverの方は、そのアドレスをbridge(Rust)、bridge(C) の関数の戻り値としてFDW内に保存
    let query_state = TdQueryState {
        job_id: job_id,
        result_receiver: rx
    };

    let td_query_state = Box::into_raw(Box::new(query_state));

    td_query_state    // <<< return value
  • Treasure Data FDWからbridge(C) -> bridge(Rust) 側にReceiverを渡し、bridge(Rust) でReceiver::recv() を呼びクエリ結果1レコードを取得
    match query_state.result_receiver.recv() {
        Ok(result) => match result {
            Some(xs) => {
                for x in xs.into_iter() {
                    match x {
                        Value::Nil => add_nil(context),
                        Value::Boolean(b) => add_bool(context, b),
                        Value::Integer(Integer::U64(ui)) => add_u64(context, ui),
                        Value::Integer(Integer::I64(si)) => add_i64(context, si),
                        Value::Float(Float::F32(f)) => add_f32(context, f),
                        Value::Float(Float::F64(d)) => add_f64(context, d),
                        Value::String(s) => {
                            let bytes = s.as_bytes();
                            add_string(context, bytes.len(), bytes)
                        },
                        Value::Binary(bs) => {
                            let bytes = bs.as_slice();
                            add_bytes(context, bytes.len(), bytes)
                        },
                        other => {
                            log!(debug_log, "fetch_result_row: {:?} is not supported", other);
                            add_nil(context)
                        },
                    }
                };
                true
            },
            None => {
                drop(td_query_state);
                false
            }
        },
        Err(RecvError) => {
            drop(query_state);
            false
        }
    }
  • bridge(Rust) -> bridge(C) に何とかしてレコードを返し (後述)、それをFDWに戻す

というように、mpsc::channel() で生成される内部的なキューをクエリ結果の受け渡しに使ってみました。

クエリ結果の受け渡し

PostgreSQLからFDWにクエリ結果1レコードを要求された際の値の返し方ですが、試行錯誤 & 二転三転の末、現状以下のようになっています。

  • FDW側でレコードの要素数分のchar *の配列を生成しReceiverと共にbridge(C)に渡す
  • bridge(C)では、char *の配列をvoid *なコンテキストとしてbridge(Rust)に渡す。その際、Receiverと各型に応じたコールバック関数群も渡す
int fetchResultRow(void *td_query_state, int natts, char **values)
{
	int ret;
	fetch_result_context context;

	context.index = 0;
	context.values = values;

	ret = fetch_result_row(
	          td_query_state,
	          &context,
	          add_nil,
	          add_bool,
	          add_u64,
	          add_i64,
	          add_f32,
	          add_f64,
	          add_string,
	          add_bytes,
	          debug_log,
	          error_log);
  • bridge(Rust)では、Receiver::recv()で取得したクエリ結果レコードの要素の、各型に応じたコールバック関数に要素の値とvoid *なコンテキストを渡す
    match query_state.result_receiver.recv() {
        Ok(result) => match result {
            Some(xs) => {
                for x in xs.into_iter() {
                    match x {
                        Value::Nil => add_nil(context),
                        Value::Boolean(b) => add_bool(context, b),
                        Value::Integer(Integer::U64(ui)) => add_u64(context, ui),
                        Value::Integer(Integer::I64(si)) => add_i64(context, si),
                        Value::Float(Float::F32(f)) => add_f32(context, f),
                        Value::Float(Float::F64(d)) => add_f64(context, d),
                        Value::String(s) => {
                            let bytes = s.as_bytes();
                            add_string(context, bytes.len(), bytes)
                        },
                        Value::Binary(bs) => {
                            let bytes = bs.as_slice();
                            add_bytes(context, bytes.len(), bytes)
                        },
                        other => {
                            log!(debug_log, "fetch_result_row: {:?} is not supported", other);
                            add_nil(context)
                        },
                    }
                };
                true
            },
  • bridge(C)のコールバック関数では、コンテキストから復元したchar *の配列の適切な位置に、bridge(Rust)から渡ってきた値を文字列としてコピー
static int add_string(fetch_result_context *context, size_t len, const char *s)
{
	char *buf = (char *)ALLOC(len + 1);
	memcpy(buf, s, len);
	buf[len] = '\0';
	context->values[context->index] = buf;
	context->index++;
	return 0;
}

要するに、"FDWで用意したクエリ結果1レコード用のchar *の配列に各要素を文字列として書き込む" ために bridge(C)->bridge(Rust)->Callback function in bridge(C)というややこしい呼び出しになっています。なぜこのようになっているかというと

  • PostgreSQL内での動的メモリ確保はpalloc()というPostgreSQLの関数を用いることが推奨 (メモリリークの影響を限定) されており
  • 可能であればクエリ結果の各要素の値のコピー時にもpalloc()でメモリ確保したい
  • とはいえ、bridge(Rust)側で直接palloc()を呼び出すのは色々面倒そうなので避けたい

という事情があったりします... しかし、各要素型分のコールバック関数群をbridge(C)->bridge(Rust)で渡すより、palloc()を呼ぶ動的メモリ確保用のコールバックを渡したほうがすっきりするので、後ほど修正しそうな気がします。

C -> Rustでのハマリどころ

他にも色々はまった気がするのですが主なものは以下の通り

RustオブジェクトをCに渡す方法

Boxでヒープに置くだけでは不十分で、mem::transmuteとBox::into_rawのいずれかでRust側でのメモリ管理から外す必要あり。Box::into_rawだとunsafeいらないので少し嬉しい

Rustでraw pointerを複数回dereferenceする場合

Box::from_raw()を使うと元のアドレスが二重で解放されてしまうので、その場合は unsafe { &mut *x } を使う必要あり

この辺りは、clangの "-fsanitize=address" を使うと非常に便利でした。


この、Treasure Data FDWですが色々と修正の余地がありそう、かつPostgreSQLのFDW対応テーブルへのINSERT -> Treasure Dataへのbulk insertもそのうち実装したいので、今後も細々と暇を見つけて弄りたいと思います。

PostgreSQL内の外部テーブルをBIツールからアクセスできるように試みた話

この記事は Treasure Data Advent Calendar 2015 - Qiita の14日目の記事で、現在12/21です。

なぜこの記事がTreasure DataのAdvent Calendarに入っているかというと、Treasure DataのDataTanksというオプションサービスでPostgreSQLが利用されているためちょっと関係していると個人的に考えています。

DataTanksそのものについては、こちらの記事に良く纏められているのでご参照ください!

qiita.com

このDataTanks、ベースであるPostgreSQLのFDWを用いた外部テーブルを利用することが多く、かつBIツールと接続して利用することが多い、のですが幾つかのBIツールからPostgreSQL上の外部テーブルにアクセスできない(BIツールが外部テーブルの存在を検知できない)問題に気がつきました。

まぁ、workaroundとしては、この外部テーブルをVIEWで包めばBIツールからアクセスできるようになるのですが、あまりスマートではありません。そこで、対応案を考えてみた、というのが今回のお話です。決して、途中まで書いていた記事の内容が思いっきり被っていたので強引に方向展開した訳ではありません。

何が問題なのか?

例えば、TableauをPostgreSQLのデータベースに接続させると以下のようなクエリーが飛んできます。このクエリーによって、テーブルの種別が 'r':通常のテーブル または 'v':ビュー であるものの一覧を取得しています。

BEGIN;
DECLARE "SQL_CUR0x0123456789ab" CURSOR FOR
SELECT relname, nspname, relkind
FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n
WHERE relkind IN ('r', 'v')
AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1')
AND n.oid = relnamespace
ORDER BY nspname, relname;
FETCH 100 IN "SQL_CUR0x0123456789ab";
CLOSE "SQL_CUR0x0123456789ab";

しかし、外部テーブル(foreign table)の種別は 'f' であるため、このクエリ条件の対象とはならず、Tableauからは認識されません。

どうすれば良いのか?

色々調べてみると同様の問題が某所のサポートページに載っているのを発見しましたが、BIツール側で対応しそうな雰囲気は無さそうでした。仕方が無いので、上記のようなクエリーが発行された場合はQuery treeを書き換えて、検索対象のテーブル種別に 'f':外部テーブル も追加できないか検討してみました。

幸いなことにPostgreSQLは拡張ライブラリを書くことで簡単にクエリーの書き換えができるようなので、僕のようなPostgreSQL拡張素人でも何とかなるのではないかと楽観的に考えていました(その時は...)

どうやったか?

post_parse_analyze_hookの利用

他の拡張ライブラリを見るとanalyze済みのQuery構造体にアクセスしているものが幾つかありました。どうやら、backend/parser/analyze.c の post_parse_analyze_hook を利用するとanalyze後に任意の関数を呼んでもらえそうです。

backend/parser/analyze.c

/* Hook for plugins to get control at end of parse analysis */
post_parse_analyze_hook_type post_parse_analyze_hook = NULL;

そこで最初の一歩として、

  • 拡張ライブラリ内に適当にログを吐く関数を作って
  • 拡張ライブラリ内の _PG_init() で上記のフックにその関数を指定し
  • Makefile書いて.soを生成しインストール
  • postgresql.confのshared_preload_librariesにそのライブラリ名を指定し
  • PostgreSQLを起動

とやってみたところ、期待通りにクエリの発行時にログが出力されたので、これは余裕で行けるのでは?感が高まりました。

Query構造体をチェック

拡張ライブラリ内でフックに指定した関数にはanalyze済みのQuery構造体が渡ってきます。なのでこれを見れば、BIツールから飛んできた前述のクエリのように "テーブルを列挙しようとしているが外部テーブルが含まれていない" クエリなのかどうか?が判断できそうです。

Query構造体にはrtableというスキャン対象のテーブル情報が詰まったRangeTblEntry構造体とjointreeというpredicatesが詰まっているFromExpr構造体が含まれています。

include/nodes/parsenodes.h

typedef struct Query
{
    :
    List       *rtable;         /* list of range table entries */
    FromExpr   *jointree;       /* table join tree (FROM and WHERE clauses) */
    :

RangeTblEntry構造体にはOid relidが含まれており、これを辿ってRelationData構造体経由でpg_class用の構造体にアクセスできそうです。ちなみにPostgreSQLの実装内ではList構造体が使われることが多いのですが、便利な一方、実際の型が読み取れないのでgrepやctagsでコードを追いにくいなぁと思いました。

include/nodes/parsenodes.h

typedef struct RangeTblEntry
{   
    NodeTag     type;
    RTEKind     rtekind;        /* see above */
    Oid         relid;          /* OID of the relation */
    char        relkind;        /* relation kind (see pg_class.relkind) */
    Alias      *eref;           /* expanded reference names */
        :

include/utils/rel.h

typedef struct RelationData
{
        :
    Form_pg_class rd_rel;       /* RELATION tuple */
        :

include/catalog/pg_class.h

CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
{
    NameData    relname;        /* class name */
    Oid         relnamespace;   /* OID of namespace containing this class */
    Oid         reltype;        /* OID of entry in pg_type for table's
                                 * implicit row type */
        :

pg_class用の構造体にはrelnameやrelnamespaceが含まれているので、これらを用いるとクエリ対象のテーブルが "pg_catalog.pg_class" であるかチェックできます。

次に当該クエリがテーブル種別を表すrelkindを参照しているかどうかチェックし、参照している場合はクエリ上におけるrelkindの登場位置を覚えておきます。この情報は後ほど利用します。チェックの仕方はRangeTblEntry構造体に含まれるerefのcolnamesをなめていけば可能そうです。もう勝ったも同然な気持ちでいました。

include/nodes/primnodes.h

typedef struct Alias
{   
    NodeTag     type;
    char       *aliasname;      /* aliased rel name (never qualified) */
    List       *colnames;       /* optional list of column aliases */
} Alias;

クエリの条件書き換え

PostgreSQLにはexpression_tree_walker()というNodeをtraverseするための関数が用意されています。それ用の関数を自分で書くのは面倒だなぁと思っていたのでとても助かりました。これが無かったらこの拡張ライブラリは半日で書けなかったと思います。

で、expression_tree_walkerにはvisitorとしての処理を行う関数を渡してあげるのですが、この関数内では以下のことを行うようにしました。

  • 対象のNodeがScalarArrayOpExprである場合(例:"n in (1, 2, 3)")
  • かつ、pg_catalog.pg_class.relkindを参照している場合(前述のRangeTblEntry->eref->colnamesチェック時に保存した位置を利用)
  • かつ、IN句の条件に 'r':標準のテーブル が含まれており、'f':外部テーブル が含まれていない場合
  • ScalarArrayOpExprの条件配列に 'f':外部テーブル を追加する

本来は、IN句を用いずにrelkindがチェックされるケース(例:"relkind = 'r' and relkind = 'v'")を考慮しOpExprも見るべきなのですが、BIツールから発行されるクエリーで今のところそのようなものを観測していないこと、及び、このケースのクエリ書き換えの場合、traverse対象Nodeの上位Nodeを書き換える必要があり素人にはちょっと面倒くさそう、といった理由で未実装となっています。

以上のような感じで実装してみたところ、無事想定するクエリを書き換えることができました。

BEGIN;
DECLARE "SQL_CUR0x0123456789ab" CURSOR FOR
SELECT relname, nspname, relkind
FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n
WHERE relkind IN ('r', 'v')
AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1')
AND n.oid = relnamespace
ORDER BY nspname, relname;
FETCH 100 IN "SQL_CUR0x0123456789ab"; 
         relname         | nspname | relkind 
-------------------------+---------+---------
 ftex_test_foreign_table | public  | f
(1 row)

現在の状況と今後の予定

現在は komamitsu/foreign_table_exposer · GitHub 上で開発を進めているので、何かフィードバックがあれば是非issue登録やpull requestを頂けると有難いです。あとPGNXにも登録してみましたので、こちらからも利用できると思います(foreign_table_exposer: Expose foreign tables as a regular table / PostgreSQL Extension Network)。

今後は、以下のような場合に問題が出そうなので、もう少し情報を集めつつ、必要であれば発動条件の制限をしていこうかと思っています。

  • pg_catalog.pg_class.relkind経由でテーブルを列挙し、標準テーブルのみ扱うことを想定し外部テーブルを扱うと問題があるケース

今回初めてPostgreSQLの拡張ライブラリを書いてみたのですが、簡単に色々なことができるので非常に面白かったです。以前、komamitsu/td-fdw · GitHub というMulticornを用いたFDWを書いたのですが、機会があればCで書き直せそうだなぁと思いました。

FluencyというYet another fluent Java loggerを作った話

この記事は、Fluentd Advent Calendar 2015 - Qiita の七日目の記事です。

先日、komamitsu/fluency · GitHub というFluentd (in_forward) 用のJava loggerを作ってみました。

元々、fluent/fluent-logger-java · GitHub の開発を見ていたのですが、Fluentd MLで色々と要望があって機能を拡張したい欲が高まったものの既存の実装的に面倒臭そうな感じだったので、ついカッとなってゼロから書いてしまいました。

特徴

特徴としては以下のものがあります。

  • 通常 (fluent/fluent-logger-java · GitHub) より三倍速い
  • Fluentdへの送信において同期 / 非同期のモードを選択できる
  • 複数のFluentdを指定しfailoverすることが可能
  • Fluentd <-> Fluency間のheartbeatにおいてTCP / UDPの利用を選択できる
  • Fluentdへの送信データフォーマットでMessage / PackedForwardを選択できる
    • 僕が二年程前に当時存在していた各言語のfluent loggerをざっと見た感じ、全てのloggerで実装が容易なMessageフォーマット ([tag, time, event] という単位で1イベント毎に送信) を用いていた気がします
    • Fluentdのout_forwardではPackedForwardフォーマット ([tag, MessagePacked chunk([[time, event], [time, event]], ...])] のようにtag毎に複数のイベントを送信) を使っており、送信側からまとめて送れる。また、Fluentd側ではMessagePackedなchunkをそのままemitできるので性能・CPU負荷的に有利
  • FluentdからのACKを待ちat least onceを実現するモードを有効化 / 無効化できる
    • イベント送信の度にFluentdからのACKを待つので、例えばFluentd側でbuffer fullな場合、それの解消を待つことによりイベントの取り損ね (送り損ね?) を防ぐことができる

以降では内部の作りについて簡単に説明します。

バッファ管理とFluentdへの送信

既存の fluent/fluent-logger-java · GitHub だとイベント追加時に以下を行っています。

  • MessagePackシリアライズされたイベントのデータをメモリ上のバッファにMessageフォーマット ([tag, time, event]) として追加
  • Fluentdのin_forwardに対し、バッファ上のデータをTCPで送信
  • 成功したらバッファクリア、失敗したら次回イベント追加時にまとめて送信

ただ、この方法だとバッファリングされた複数のイベントを送信する際、ACKレスポンス対応が難しい (各イベント毎にACKを待つためバッファ上のMessagePackシリアライズされたデータをデシリアライズして個々のイベントを取得する必要がある) という問題がありました。また、PackedForwardフォーマットには簡単に対応できません。

そこでFluencyではPackedForwardフォーマットの場合、イベント追加時に以下のようにしています (Messageフォーマットはこれより単純なのでここではPackedForwardを例に...)。

f:id:komamitsu:20151207001354j:plain

  • tag毎にバッファを保持しているMap<String, Buffer>に、既に当該tag用のバッファが存在している場合
    • かつ、そのバッファにMessagePackシリアライズされたイベントのデータを書き込む空き容量がある場合は、そのバッファに追記
    • バッファに書き込む空き容量がない場合、指定されている割合でバッファを拡張して追記
  • そもそも当該tag用のバッファが存在しない場合、初期サイズでバッファを割り当てて、そこに書き込む
  • 一定サイズまで拡張されているバッファ、もしくは一定時間フラッシュされていないバッファを送信用のキューに移動 (Mapからはバッファは取り除かれる)

また、自前でバッファプールを持っており、上記処理においてバッファの取得の際はこのプールから取得することでバッファを再利用しています (バッファサイズ拡張時の旧バッファおよびFluentdへのデータ送信完了済みバッファがバッファプールに返却される) 。

Fluentdへのイベントデータ送信時はPackedForwardフォーマットの場合、前述した送信用のキューからtag付きのMessagePackedなchunk ([tag, MessagePacked chunk([[time, event], [time, event]], ...])]) を取り出して送信します。複数のイベントを一括で送信できる上、Fluentd側では受け取ったchunkをそのままemitできるので速度 / CPU負荷ともに効率的です。

同期送信モードの場合はイベント追加APIを呼び出したスレッドで、非同期送信モードの場合は専用スレッドにて上記のイベントデータ送信処理を行います。後者の場合、送信処理で詰まってもイベント追加API呼び出しではブロックされません。

ちなみに、Messageフォーマット ([tag, time, event]) の場合は各イベント毎に送信キューに入れています。PackedForwardに比べると、こちらはシンプルですので詳細は省略します。

Fluentdの死活監視

Fluentdのin_forwardに対しては、UDPまたはTCPにてheartbeatを投げ死活監視を行っています。

heartbeatの失敗 / 遅延に対する判断ロジックについては、元々Fluentdのout_forward <-> in_forward間で ϕ Accrual Failure Detector アルゴリズムが用いられていたため、これをJavaで実装した komamitsu/phi-accural-failure-detector · GitHub を使って実現しています。とはいえ、Fluentdでこのアルゴリズムを使うのはやり過ぎな感もあるため、今後よりシンプルなロジックに差し代わる可能性もありそうです。

また、heartbeatにより利用不可と判断された場合はfailoverとして、secondaryのFluentdを利用します。ただし、利用不可と判断されたprimaryのFluentdが復帰し一定時間経った場合は、再度そちらを利用します。

ACKレスポンス対応

前述したバッファ管理にて説明した通り、送信データ単位 (Messageフォーマットの場合: [tag, time, event], PackedForwardフォーマットの場合: [tag, MessagePacked chunk([[time, event], [time, event]], ...])]) で送信キューに入れていることにより、ACKレスポンスの対応が容易になっています。

具体的にはACKレスポンスモードの場合、各データの末尾にUUIDを付与しFluentdからのレスポンスに同値が含まれていることを確認するまでブロッキング (次のデータを送信しない) することでat least onceを実現しています。ACKを待つために性能への影響は当然0ではありませんが、大量のイベントをまとめて送信するPackedForwardフォーマットとの併用により、性能にそれほど影響を与えずかつat least onceを実現できるかと思います。

活用事例

Fluencyは既に大量のデータ送信が想定されている treasure-data/kafka-fluentd-consumer · GitHub に組み込まれたりしているようです。気軽に使える
fluent/fluent-logger-java · GitHub、タフな環境でも使える komamitsu/fluency · GitHub という使い分けは良さそうな気がするので、Fluentdのfailoverが必要であったり、大量のデータ送信が行われるような環境であれば是非お試し頂いてフィードバックを頂けたら幸いです。

今後の予定

リトライ機能や複数のFluentdを指定しfailoverすることが可能であるため、それ以外のエラーハンドリングには現状力を入れていませんでした。エラー通知用のコールバック等、機能追加していこうかと思います。また、ファイルバッファリングを実装しさらに冗長性を高めるのもありかなぁ...等々

"Presto in Treasure Data" @ db tech showcase 2015 tokyo

という題で話をしてきました。 www.insight-tec.com


スライドはこちら。

www.slideshare.net

PrestoのPlan

Prestoのコードを眺めていて、com.facebook.presto.execution.SqlQueryExecution#doAnalyzeQuery内で生成されるPlanを見てみたかったのでPrestoのJVMをアタッチしてPlanクラスの中を覗いてみた。

対象のクエリはこちら(catalogはtpch)。

select c.nationkey, count(1) from orders o 
join customer c on o.custkey = c.custkey
where o.orderpriority = '1-URGENT' group by c.nationkey

Planの結果はこれ。
f:id:komamitsu:20150405233413p:plain

Subplanを経由してのStageExecutionPlan:
f:id:komamitsu:20150426235803p:plain

MessagePack v07とJacksonの文字列を対象とした性能比較

        int cnt = 800000;

        {
            ObjectMapper mapper = new ObjectMapper();
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            List<String> strList = new ArrayList<String>(cnt);
            for (int i = 0; i < cnt; i++) {
                strList.add("01234567");
            }

            System.gc();
            Thread.sleep(3000);

            long start = System.currentTimeMillis();
            mapper.writeValue(outputStream, strList);
            System.out.println("Jackson(Serialize, String): " + (System.currentTimeMillis() - start) + "ms");

            System.gc();
            Thread.sleep(3000);

            start = System.currentTimeMillis();
            mapper.readValue(outputStream.toByteArray(), new TypeReference<List<String>>() {});
            System.out.println("Jackson(Deserialize, String): " + (System.currentTimeMillis() - start) + "ms");
        }

        {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            MessagePacker packer = org.msgpack.core.MessagePack.newDefaultPacker(outputStream);

            System.gc();
            Thread.sleep(3000);

            long start = System.currentTimeMillis();
            for (int i = 0; i < cnt; i++) {
                packer.packString("01234567");
            }
            packer.flush();

            System.out.println("MessagePack(Serialize, String): " + (System.currentTimeMillis() - start) + "ms");

            MessageUnpacker unpacker = org.msgpack.core.MessagePack.newDefaultUnpacker(outputStream.toByteArray());

            System.gc();
            Thread.sleep(3000);

            start = System.currentTimeMillis();
            for (int i = 0; i < cnt; i++) {
                unpacker.unpackString();
            }
            System.out.println("MessagePack(Deserialize, String): " + (System.currentTimeMillis() - start) + "ms");
        }

int cnt = 1000000;

Jackson(Serialize, String): 71ms
Jackson(Deserialize, String): 60ms
MessagePack(Serialize, String): 96ms
MessagePack(Deserialize, String): 82ms

int cnt = 1000000;

Jackson(Serialize, String): 103ms
Jackson(Deserialize, String): 246ms
MessagePack(Serialize, String): 192ms
MessagePack(Deserialize, String): 158ms

int cnt = 10000000;

Jackson(Serialize, String): 574ms
Jackson(Deserialize, String): 7259ms
MessagePack(Serialize, String): 1316ms
MessagePack(Deserialize, String): 1049ms