Rustを使ってPostgreSQLの拡張ライブラリ (FDW) を書いてみた話

というタイトルですが、具体的には "Rustで拡張ライブラリ全体を書いた" のではなく "Cで拡張ライブラリの本体を書いてその中からRustのライブラリを呼んで、非同期でRust側からの結果を取得" してます。

ちなみに、Foreign Data Wrapper (FDW) とは、PostgreSQL管理外のデータソースにIOできるPostgreSQLのテーブルの一種です。今回の場合 "Treasure DataというSaaSに裏でクエリーを投げて結果を取得するFDW" となります。

github.com

なぜこんなことをやろうかと思ったか?

という背景がありました。

何が必要?

  1. Treasure DataのAPI経由でSQLを投げて結果を取得できるRust製のクライアントライブラリ
  2. Cで書かれたFDW本体。このFDWに対応したPostgreSQLのテーブルに対してSQLが発行されると、PostgreSQLに登録してあるFDWの関数が適宜コールバックされる(実行計画に必要な情報提供、クエリ開始時、クエリ結果一行読み込み、など)
  3. 上記1と2をつなぐ何か

1 は普通にREST APIのクライアントライブラリを書けば良いので、すでに書いてあり (GitHub - komamitsu/td-client-rust: Rust Client Library for Treasure Data) crates.ioにもpush済み (https://crates.io/crates/td-client)
2 は既存のPostgreSQL FDW等をベースすれば難しくなさそう。Treasure DataはPrestoとHiveをサポートしているので、PostgreSQLの構文との差異を適当に吸収する必要はあり ( "~~" => "LIKE" 等)

この記事では3について説明をしたいと思います。ちなみに3はソースコード的には2と同じプロジェクトに含まれます。

CとRustの連携部分

今回のケースでは、以下のような連携が必要となります

  1. 外部データソースのスキャン開始時に、td-client-rustを用いてTreasure DataにSQLを投げて処理が終わりクエリ結果が生成されるのを待つ。クエリ発行時の初回のみ
  2. 生成されたクエリ結果を1レコードずつ読みRust側からC側に返す。これはPostgreSQL側から要求がある度、通常複数回処理が発生 (LIMIT句で限定されているか、最後までクエリ結果を読みきるまで)

実際のコードではFDWとtd-client-rustの間に橋渡し的なCとRustの関数が入っています。

以降は簡単のためこれらを bridge(C)、bridge(Rust) と呼びます。

全体の流れを図にすると
f:id:komamitsu:20161212004619j:plain
f:id:komamitsu:20161212004628j:plain
このようになります。

クエリ結果の非同期取得

Treasure Dataから返ってきたSQLクエリ結果をPostgreSQLに返すタイミングは、PostgreSQLからの非同期なFDW関数呼び出しとなるため少し工夫が要ります。今回は...

  • bridge(Rust)内で、Treasure Dataのクエリ処理終了を確認後、std::sync::mpsc::channel() でstd::sync::mpsc::SenderとReceiverの組を生成
    let (tx, rx) = mpsc::channel();
  • さらにスレッドを生成し、その中で td-client-rustのtd::client::Client::each_row_in_job_result() を呼び出す。この関数にはクエリ結果1レコードごとに呼ばれるclosureを渡せるので、そのFnの中でレコードをSender::send() でキューに追加するようにする。全レコード取得後、門番的な意味合いのレコードをキュー末尾に入れておく
    thread::spawn(move || {
        match client.each_row_in_job_result(
            job_id,
            &|xs: Vec<Value>| match tx.send(Some(xs)) {
                Ok(()) => (),
                Err(err) => {
                    log!(error_log, "Failed to pass results. job_id={:?}, error={:?}",
                             job_id, err)
                    // TODO: How to propagate this error....?
                }
            }
        ) {
            Ok(()) => match tx.send(None) {
                Ok(()) => (),
                Err(err) => {
                    log!(error_log, "Failed put sentinel in the queue. job_id={:?}, error={:?}",
                             job_id, err)
                }
            },
            Err(err) => {
                log!(error_log, "Failed to fetch result. job_id={:?}, error={:?}",
                         job_id, err)
            }
        }
    });
  • Receiverの方は、そのアドレスをbridge(Rust)、bridge(C) の関数の戻り値としてFDW内に保存
    let query_state = TdQueryState {
        job_id: job_id,
        result_receiver: rx
    };

    let td_query_state = Box::into_raw(Box::new(query_state));

    td_query_state    // <<< return value
  • Treasure Data FDWからbridge(C) -> bridge(Rust) 側にReceiverを渡し、bridge(Rust) でReceiver::recv() を呼びクエリ結果1レコードを取得
    match query_state.result_receiver.recv() {
        Ok(result) => match result {
            Some(xs) => {
                for x in xs.into_iter() {
                    match x {
                        Value::Nil => add_nil(context),
                        Value::Boolean(b) => add_bool(context, b),
                        Value::Integer(Integer::U64(ui)) => add_u64(context, ui),
                        Value::Integer(Integer::I64(si)) => add_i64(context, si),
                        Value::Float(Float::F32(f)) => add_f32(context, f),
                        Value::Float(Float::F64(d)) => add_f64(context, d),
                        Value::String(s) => {
                            let bytes = s.as_bytes();
                            add_string(context, bytes.len(), bytes)
                        },
                        Value::Binary(bs) => {
                            let bytes = bs.as_slice();
                            add_bytes(context, bytes.len(), bytes)
                        },
                        other => {
                            log!(debug_log, "fetch_result_row: {:?} is not supported", other);
                            add_nil(context)
                        },
                    }
                };
                true
            },
            None => {
                drop(td_query_state);
                false
            }
        },
        Err(RecvError) => {
            drop(query_state);
            false
        }
    }
  • bridge(Rust) -> bridge(C) に何とかしてレコードを返し (後述)、それをFDWに戻す

というように、mpsc::channel() で生成される内部的なキューをクエリ結果の受け渡しに使ってみました。

クエリ結果の受け渡し

PostgreSQLからFDWにクエリ結果1レコードを要求された際の値の返し方ですが、試行錯誤 & 二転三転の末、現状以下のようになっています。

  • FDW側でレコードの要素数分のchar *の配列を生成しReceiverと共にbridge(C)に渡す
  • bridge(C)では、char *の配列をvoid *なコンテキストとしてbridge(Rust)に渡す。その際、Receiverと各型に応じたコールバック関数群も渡す
int fetchResultRow(void *td_query_state, int natts, char **values)
{
	int ret;
	fetch_result_context context;

	context.index = 0;
	context.values = values;

	ret = fetch_result_row(
	          td_query_state,
	          &context,
	          add_nil,
	          add_bool,
	          add_u64,
	          add_i64,
	          add_f32,
	          add_f64,
	          add_string,
	          add_bytes,
	          debug_log,
	          error_log);
  • bridge(Rust)では、Receiver::recv()で取得したクエリ結果レコードの要素の、各型に応じたコールバック関数に要素の値とvoid *なコンテキストを渡す
    match query_state.result_receiver.recv() {
        Ok(result) => match result {
            Some(xs) => {
                for x in xs.into_iter() {
                    match x {
                        Value::Nil => add_nil(context),
                        Value::Boolean(b) => add_bool(context, b),
                        Value::Integer(Integer::U64(ui)) => add_u64(context, ui),
                        Value::Integer(Integer::I64(si)) => add_i64(context, si),
                        Value::Float(Float::F32(f)) => add_f32(context, f),
                        Value::Float(Float::F64(d)) => add_f64(context, d),
                        Value::String(s) => {
                            let bytes = s.as_bytes();
                            add_string(context, bytes.len(), bytes)
                        },
                        Value::Binary(bs) => {
                            let bytes = bs.as_slice();
                            add_bytes(context, bytes.len(), bytes)
                        },
                        other => {
                            log!(debug_log, "fetch_result_row: {:?} is not supported", other);
                            add_nil(context)
                        },
                    }
                };
                true
            },
  • bridge(C)のコールバック関数では、コンテキストから復元したchar *の配列の適切な位置に、bridge(Rust)から渡ってきた値を文字列としてコピー
static int add_string(fetch_result_context *context, size_t len, const char *s)
{
	char *buf = (char *)ALLOC(len + 1);
	memcpy(buf, s, len);
	buf[len] = '\0';
	context->values[context->index] = buf;
	context->index++;
	return 0;
}

要するに、"FDWで用意したクエリ結果1レコード用のchar *の配列に各要素を文字列として書き込む" ために bridge(C)->bridge(Rust)->Callback function in bridge(C)というややこしい呼び出しになっています。なぜこのようになっているかというと

  • PostgreSQL内での動的メモリ確保はpalloc()というPostgreSQLの関数を用いることが推奨 (メモリリークの影響を限定) されており
  • 可能であればクエリ結果の各要素の値のコピー時にもpalloc()でメモリ確保したい
  • とはいえ、bridge(Rust)側で直接palloc()を呼び出すのは色々面倒そうなので避けたい

という事情があったりします... しかし、各要素型分のコールバック関数群をbridge(C)->bridge(Rust)で渡すより、palloc()を呼ぶ動的メモリ確保用のコールバックを渡したほうがすっきりするので、後ほど修正しそうな気がします。

C -> Rustでのハマリどころ

他にも色々はまった気がするのですが主なものは以下の通り

RustオブジェクトをCに渡す方法

Boxでヒープに置くだけでは不十分で、mem::transmuteとBox::into_rawのいずれかでRust側でのメモリ管理から外す必要あり。Box::into_rawだとunsafeいらないので少し嬉しい

Rustでraw pointerを複数回dereferenceする場合

Box::from_raw()を使うと元のアドレスが二重で解放されてしまうので、その場合は unsafe { &mut *x } を使う必要あり

この辺りは、clangの "-fsanitize=address" を使うと非常に便利でした。


この、Treasure Data FDWですが色々と修正の余地がありそう、かつPostgreSQLのFDW対応テーブルへのINSERT -> Treasure Dataへのbulk insertもそのうち実装したいので、今後も細々と暇を見つけて弄りたいと思います。