Rustを使ってPostgreSQLの拡張ライブラリ (FDW) を書いてみた話
というタイトルですが、具体的には "Rustで拡張ライブラリ全体を書いた" のではなく "Cで拡張ライブラリの本体を書いてその中からRustのライブラリを呼んで、非同期でRust側からの結果を取得" してます。
ちなみに、Foreign Data Wrapper (FDW) とは、PostgreSQL管理外のデータソースにIOできるPostgreSQLのテーブルの一種です。今回の場合 "Treasure DataというSaaSに裏でクエリーを投げて結果を取得するFDW" となります。
なぜこんなことをやろうかと思ったか?
- もともとはMulticornという "Pythonで外部データソースにアクセスできる処理を書けるFDWフレームワーク" を用いて GitHub - komamitsu/td-fdw: Multicorn based PostgreSQL Foreign Data Wrapper for Treasure Data を作って使っていた。性能的・機能的に改善したい点があるもののMulticorn側で頑張る必要があった
- 半年ほど前からRustに触り始めていて、さらに何となく送ってみたPR https://github.com/rust-lang/rust/pull/33976 も1.11に取り込まれてRust熱が高まっていた
- Rustで外部データソースへのアクセス部分を書いておけば全てCで書くよりは楽だし、Cとの連携も割と楽そうと思った。GitHub - posix4e/rpgffi: R(Rust) PG(Postgresql) FFI (Foreign Function Interface) というのもあり、FDW本体をRustで書く手も無くは無いが、ひとまず無難にCで書いておく方向
という背景がありました。
何が必要?
- Treasure DataのAPI経由でSQLを投げて結果を取得できるRust製のクライアントライブラリ
- Cで書かれたFDW本体。このFDWに対応したPostgreSQLのテーブルに対してSQLが発行されると、PostgreSQLに登録してあるFDWの関数が適宜コールバックされる(実行計画に必要な情報提供、クエリ開始時、クエリ結果一行読み込み、など)
- 上記1と2をつなぐ何か
1 は普通にREST APIのクライアントライブラリを書けば良いので、すでに書いてあり (GitHub - komamitsu/td-client-rust: Rust Client Library for Treasure Data) crates.ioにもpush済み (https://crates.io/crates/td-client)
2 は既存のPostgreSQL FDW等をベースすれば難しくなさそう。Treasure DataはPrestoとHiveをサポートしているので、PostgreSQLの構文との差異を適当に吸収する必要はあり ( "~~" => "LIKE" 等)
この記事では3について説明をしたいと思います。ちなみに3はソースコード的には2と同じプロジェクトに含まれます。
CとRustの連携部分
今回のケースでは、以下のような連携が必要となります
- 外部データソースのスキャン開始時に、td-client-rustを用いてTreasure DataにSQLを投げて処理が終わりクエリ結果が生成されるのを待つ。クエリ発行時の初回のみ
- 生成されたクエリ結果を1レコードずつ読みRust側からC側に返す。これはPostgreSQL側から要求がある度、通常複数回処理が発生 (LIMIT句で限定されているか、最後までクエリ結果を読みきるまで)
実際のコードではFDWとtd-client-rustの間に橋渡し的なCとRustの関数が入っています。
- Cの方は https://github.com/komamitsu/treasuredata_fdw/blob/master/bridge.c
- Rustの方は https://github.com/komamitsu/treasuredata_fdw/blob/master/bridge_td_client_rust/src/lib.rs
以降は簡単のためこれらを bridge(C)、bridge(Rust) と呼びます。
全体の流れを図にすると
このようになります。
クエリ結果の非同期取得
Treasure Dataから返ってきたSQLクエリ結果をPostgreSQLに返すタイミングは、PostgreSQLからの非同期なFDW関数呼び出しとなるため少し工夫が要ります。今回は...
- bridge(Rust)内で、Treasure Dataのクエリ処理終了を確認後、std::sync::mpsc::channel() でstd::sync::mpsc::SenderとReceiverの組を生成
let (tx, rx) = mpsc::channel();
- さらにスレッドを生成し、その中で td-client-rustのtd::client::Client::each_row_in_job_result() を呼び出す。この関数にはクエリ結果1レコードごとに呼ばれるclosureを渡せるので、そのFnの中でレコードをSender::send() でキューに追加するようにする。全レコード取得後、門番的な意味合いのレコードをキュー末尾に入れておく
thread::spawn(move || { match client.each_row_in_job_result( job_id, &|xs: Vec<Value>| match tx.send(Some(xs)) { Ok(()) => (), Err(err) => { log!(error_log, "Failed to pass results. job_id={:?}, error={:?}", job_id, err) // TODO: How to propagate this error....? } } ) { Ok(()) => match tx.send(None) { Ok(()) => (), Err(err) => { log!(error_log, "Failed put sentinel in the queue. job_id={:?}, error={:?}", job_id, err) } }, Err(err) => { log!(error_log, "Failed to fetch result. job_id={:?}, error={:?}", job_id, err) } } });
- Receiverの方は、そのアドレスをbridge(Rust)、bridge(C) の関数の戻り値としてFDW内に保存
let query_state = TdQueryState { job_id: job_id, result_receiver: rx }; let td_query_state = Box::into_raw(Box::new(query_state)); td_query_state // <<< return value
- Treasure Data FDWからbridge(C) -> bridge(Rust) 側にReceiverを渡し、bridge(Rust) でReceiver::recv() を呼びクエリ結果1レコードを取得
match query_state.result_receiver.recv() { Ok(result) => match result { Some(xs) => { for x in xs.into_iter() { match x { Value::Nil => add_nil(context), Value::Boolean(b) => add_bool(context, b), Value::Integer(Integer::U64(ui)) => add_u64(context, ui), Value::Integer(Integer::I64(si)) => add_i64(context, si), Value::Float(Float::F32(f)) => add_f32(context, f), Value::Float(Float::F64(d)) => add_f64(context, d), Value::String(s) => { let bytes = s.as_bytes(); add_string(context, bytes.len(), bytes) }, Value::Binary(bs) => { let bytes = bs.as_slice(); add_bytes(context, bytes.len(), bytes) }, other => { log!(debug_log, "fetch_result_row: {:?} is not supported", other); add_nil(context) }, } }; true }, None => { drop(td_query_state); false } }, Err(RecvError) => { drop(query_state); false } }
- bridge(Rust) -> bridge(C) に何とかしてレコードを返し (後述)、それをFDWに戻す
というように、mpsc::channel() で生成される内部的なキューをクエリ結果の受け渡しに使ってみました。
クエリ結果の受け渡し
PostgreSQLからFDWにクエリ結果1レコードを要求された際の値の返し方ですが、試行錯誤 & 二転三転の末、現状以下のようになっています。
- FDW側でレコードの要素数分のchar *の配列を生成しReceiverと共にbridge(C)に渡す
- bridge(C)では、char *の配列をvoid *なコンテキストとしてbridge(Rust)に渡す。その際、Receiverと各型に応じたコールバック関数群も渡す
int fetchResultRow(void *td_query_state, int natts, char **values) { int ret; fetch_result_context context; context.index = 0; context.values = values; ret = fetch_result_row( td_query_state, &context, add_nil, add_bool, add_u64, add_i64, add_f32, add_f64, add_string, add_bytes, debug_log, error_log);
- bridge(Rust)では、Receiver::recv()で取得したクエリ結果レコードの要素の、各型に応じたコールバック関数に要素の値とvoid *なコンテキストを渡す
match query_state.result_receiver.recv() { Ok(result) => match result { Some(xs) => { for x in xs.into_iter() { match x { Value::Nil => add_nil(context), Value::Boolean(b) => add_bool(context, b), Value::Integer(Integer::U64(ui)) => add_u64(context, ui), Value::Integer(Integer::I64(si)) => add_i64(context, si), Value::Float(Float::F32(f)) => add_f32(context, f), Value::Float(Float::F64(d)) => add_f64(context, d), Value::String(s) => { let bytes = s.as_bytes(); add_string(context, bytes.len(), bytes) }, Value::Binary(bs) => { let bytes = bs.as_slice(); add_bytes(context, bytes.len(), bytes) }, other => { log!(debug_log, "fetch_result_row: {:?} is not supported", other); add_nil(context) }, } }; true },
- bridge(C)のコールバック関数では、コンテキストから復元したchar *の配列の適切な位置に、bridge(Rust)から渡ってきた値を文字列としてコピー
static int add_string(fetch_result_context *context, size_t len, const char *s) { char *buf = (char *)ALLOC(len + 1); memcpy(buf, s, len); buf[len] = '\0'; context->values[context->index] = buf; context->index++; return 0; }
要するに、"FDWで用意したクエリ結果1レコード用のchar *の配列に各要素を文字列として書き込む" ために bridge(C)->bridge(Rust)->Callback function in bridge(C)というややこしい呼び出しになっています。なぜこのようになっているかというと
- PostgreSQL内での動的メモリ確保はpalloc()というPostgreSQLの関数を用いることが推奨 (メモリリークの影響を限定) されており
- 可能であればクエリ結果の各要素の値のコピー時にもpalloc()でメモリ確保したい
- とはいえ、bridge(Rust)側で直接palloc()を呼び出すのは色々面倒そうなので避けたい
という事情があったりします... しかし、各要素型分のコールバック関数群をbridge(C)->bridge(Rust)で渡すより、palloc()を呼ぶ動的メモリ確保用のコールバックを渡したほうがすっきりするので、後ほど修正しそうな気がします。
C -> Rustでのハマリどころ
他にも色々はまった気がするのですが主なものは以下の通り
RustオブジェクトをCに渡す方法
Boxでヒープに置くだけでは不十分で、mem::transmuteとBox::into_rawのいずれかでRust側でのメモリ管理から外す必要あり。Box::into_rawだとunsafeいらないので少し嬉しい
Rustでraw pointerを複数回dereferenceする場合
Box::from_raw()を使うと元のアドレスが二重で解放されてしまうので、その場合は unsafe { &mut *x } を使う必要あり
この辺りは、clangの "-fsanitize=address" を使うと非常に便利でした。
この、Treasure Data FDWですが色々と修正の余地がありそう、かつPostgreSQLのFDW対応テーブルへのINSERT -> Treasure Dataへのbulk insertもそのうち実装したいので、今後も細々と暇を見つけて弄りたいと思います。