PostgreSQL内の外部テーブルをBIツールからアクセスできるように試みた話

この記事は Treasure Data Advent Calendar 2015 - Qiita の14日目の記事で、現在12/21です。

なぜこの記事がTreasure DataのAdvent Calendarに入っているかというと、Treasure DataのDataTanksというオプションサービスでPostgreSQLが利用されているためちょっと関係していると個人的に考えています。

DataTanksそのものについては、こちらの記事に良く纏められているのでご参照ください!

qiita.com

このDataTanks、ベースであるPostgreSQLのFDWを用いた外部テーブルを利用することが多く、かつBIツールと接続して利用することが多い、のですが幾つかのBIツールからPostgreSQL上の外部テーブルにアクセスできない(BIツールが外部テーブルの存在を検知できない)問題に気がつきました。

まぁ、workaroundとしては、この外部テーブルをVIEWで包めばBIツールからアクセスできるようになるのですが、あまりスマートではありません。そこで、対応案を考えてみた、というのが今回のお話です。決して、途中まで書いていた記事の内容が思いっきり被っていたので強引に方向展開した訳ではありません。

何が問題なのか?

例えば、TableauをPostgreSQLのデータベースに接続させると以下のようなクエリーが飛んできます。このクエリーによって、テーブルの種別が 'r':通常のテーブル または 'v':ビュー であるものの一覧を取得しています。

BEGIN;
DECLARE "SQL_CUR0x0123456789ab" CURSOR FOR
SELECT relname, nspname, relkind
FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n
WHERE relkind IN ('r', 'v')
AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1')
AND n.oid = relnamespace
ORDER BY nspname, relname;
FETCH 100 IN "SQL_CUR0x0123456789ab";
CLOSE "SQL_CUR0x0123456789ab";

しかし、外部テーブル(foreign table)の種別は 'f' であるため、このクエリ条件の対象とはならず、Tableauからは認識されません。

どうすれば良いのか?

色々調べてみると同様の問題が某所のサポートページに載っているのを発見しましたが、BIツール側で対応しそうな雰囲気は無さそうでした。仕方が無いので、上記のようなクエリーが発行された場合はQuery treeを書き換えて、検索対象のテーブル種別に 'f':外部テーブル も追加できないか検討してみました。

幸いなことにPostgreSQLは拡張ライブラリを書くことで簡単にクエリーの書き換えができるようなので、僕のようなPostgreSQL拡張素人でも何とかなるのではないかと楽観的に考えていました(その時は...)

どうやったか?

post_parse_analyze_hookの利用

他の拡張ライブラリを見るとanalyze済みのQuery構造体にアクセスしているものが幾つかありました。どうやら、backend/parser/analyze.c の post_parse_analyze_hook を利用するとanalyze後に任意の関数を呼んでもらえそうです。

backend/parser/analyze.c

/* Hook for plugins to get control at end of parse analysis */
post_parse_analyze_hook_type post_parse_analyze_hook = NULL;

そこで最初の一歩として、

  • 拡張ライブラリ内に適当にログを吐く関数を作って
  • 拡張ライブラリ内の _PG_init() で上記のフックにその関数を指定し
  • Makefile書いて.soを生成しインストール
  • postgresql.confのshared_preload_librariesにそのライブラリ名を指定し
  • PostgreSQLを起動

とやってみたところ、期待通りにクエリの発行時にログが出力されたので、これは余裕で行けるのでは?感が高まりました。

Query構造体をチェック

拡張ライブラリ内でフックに指定した関数にはanalyze済みのQuery構造体が渡ってきます。なのでこれを見れば、BIツールから飛んできた前述のクエリのように "テーブルを列挙しようとしているが外部テーブルが含まれていない" クエリなのかどうか?が判断できそうです。

Query構造体にはrtableというスキャン対象のテーブル情報が詰まったRangeTblEntry構造体とjointreeというpredicatesが詰まっているFromExpr構造体が含まれています。

include/nodes/parsenodes.h

typedef struct Query
{
    :
    List       *rtable;         /* list of range table entries */
    FromExpr   *jointree;       /* table join tree (FROM and WHERE clauses) */
    :

RangeTblEntry構造体にはOid relidが含まれており、これを辿ってRelationData構造体経由でpg_class用の構造体にアクセスできそうです。ちなみにPostgreSQLの実装内ではList構造体が使われることが多いのですが、便利な一方、実際の型が読み取れないのでgrepやctagsでコードを追いにくいなぁと思いました。

include/nodes/parsenodes.h

typedef struct RangeTblEntry
{   
    NodeTag     type;
    RTEKind     rtekind;        /* see above */
    Oid         relid;          /* OID of the relation */
    char        relkind;        /* relation kind (see pg_class.relkind) */
    Alias      *eref;           /* expanded reference names */
        :

include/utils/rel.h

typedef struct RelationData
{
        :
    Form_pg_class rd_rel;       /* RELATION tuple */
        :

include/catalog/pg_class.h

CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
{
    NameData    relname;        /* class name */
    Oid         relnamespace;   /* OID of namespace containing this class */
    Oid         reltype;        /* OID of entry in pg_type for table's
                                 * implicit row type */
        :

pg_class用の構造体にはrelnameやrelnamespaceが含まれているので、これらを用いるとクエリ対象のテーブルが "pg_catalog.pg_class" であるかチェックできます。

次に当該クエリがテーブル種別を表すrelkindを参照しているかどうかチェックし、参照している場合はクエリ上におけるrelkindの登場位置を覚えておきます。この情報は後ほど利用します。チェックの仕方はRangeTblEntry構造体に含まれるerefのcolnamesをなめていけば可能そうです。もう勝ったも同然な気持ちでいました。

include/nodes/primnodes.h

typedef struct Alias
{   
    NodeTag     type;
    char       *aliasname;      /* aliased rel name (never qualified) */
    List       *colnames;       /* optional list of column aliases */
} Alias;

クエリの条件書き換え

PostgreSQLにはexpression_tree_walker()というNodeをtraverseするための関数が用意されています。それ用の関数を自分で書くのは面倒だなぁと思っていたのでとても助かりました。これが無かったらこの拡張ライブラリは半日で書けなかったと思います。

で、expression_tree_walkerにはvisitorとしての処理を行う関数を渡してあげるのですが、この関数内では以下のことを行うようにしました。

  • 対象のNodeがScalarArrayOpExprである場合(例:"n in (1, 2, 3)")
  • かつ、pg_catalog.pg_class.relkindを参照している場合(前述のRangeTblEntry->eref->colnamesチェック時に保存した位置を利用)
  • かつ、IN句の条件に 'r':標準のテーブル が含まれており、'f':外部テーブル が含まれていない場合
  • ScalarArrayOpExprの条件配列に 'f':外部テーブル を追加する

本来は、IN句を用いずにrelkindがチェックされるケース(例:"relkind = 'r' and relkind = 'v'")を考慮しOpExprも見るべきなのですが、BIツールから発行されるクエリーで今のところそのようなものを観測していないこと、及び、このケースのクエリ書き換えの場合、traverse対象Nodeの上位Nodeを書き換える必要があり素人にはちょっと面倒くさそう、といった理由で未実装となっています。

以上のような感じで実装してみたところ、無事想定するクエリを書き換えることができました。

BEGIN;
DECLARE "SQL_CUR0x0123456789ab" CURSOR FOR
SELECT relname, nspname, relkind
FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n
WHERE relkind IN ('r', 'v')
AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1')
AND n.oid = relnamespace
ORDER BY nspname, relname;
FETCH 100 IN "SQL_CUR0x0123456789ab"; 
         relname         | nspname | relkind 
-------------------------+---------+---------
 ftex_test_foreign_table | public  | f
(1 row)

現在の状況と今後の予定

現在は komamitsu/foreign_table_exposer · GitHub 上で開発を進めているので、何かフィードバックがあれば是非issue登録やpull requestを頂けると有難いです。あとPGNXにも登録してみましたので、こちらからも利用できると思います(foreign_table_exposer: Expose foreign tables as a regular table / PostgreSQL Extension Network)。

今後は、以下のような場合に問題が出そうなので、もう少し情報を集めつつ、必要であれば発動条件の制限をしていこうかと思っています。

  • pg_catalog.pg_class.relkind経由でテーブルを列挙し、標準テーブルのみ扱うことを想定し外部テーブルを扱うと問題があるケース

今回初めてPostgreSQLの拡張ライブラリを書いてみたのですが、簡単に色々なことができるので非常に面白かったです。以前、komamitsu/td-fdw · GitHub というMulticornを用いたFDWを書いたのですが、機会があればCで書き直せそうだなぁと思いました。

FluencyというYet another fluent Java loggerを作った話

この記事は、Fluentd Advent Calendar 2015 - Qiita の七日目の記事です。

先日、komamitsu/fluency · GitHub というFluentd (in_forward) 用のJava loggerを作ってみました。

元々、fluent/fluent-logger-java · GitHub の開発を見ていたのですが、Fluentd MLで色々と要望があって機能を拡張したい欲が高まったものの既存の実装的に面倒臭そうな感じだったので、ついカッとなってゼロから書いてしまいました。

特徴

特徴としては以下のものがあります。

  • 通常 (fluent/fluent-logger-java · GitHub) より三倍速い
  • Fluentdへの送信において同期 / 非同期のモードを選択できる
  • 複数のFluentdを指定しfailoverすることが可能
  • Fluentd <-> Fluency間のheartbeatにおいてTCP / UDPの利用を選択できる
  • Fluentdへの送信データフォーマットでMessage / PackedForwardを選択できる
    • 僕が二年程前に当時存在していた各言語のfluent loggerをざっと見た感じ、全てのloggerで実装が容易なMessageフォーマット ([tag, time, event] という単位で1イベント毎に送信) を用いていた気がします
    • Fluentdのout_forwardではPackedForwardフォーマット ([tag, MessagePacked chunk([[time, event], [time, event]], ...])] のようにtag毎に複数のイベントを送信) を使っており、送信側からまとめて送れる。また、Fluentd側ではMessagePackedなchunkをそのままemitできるので性能・CPU負荷的に有利
  • FluentdからのACKを待ちat least onceを実現するモードを有効化 / 無効化できる
    • イベント送信の度にFluentdからのACKを待つので、例えばFluentd側でbuffer fullな場合、それの解消を待つことによりイベントの取り損ね (送り損ね?) を防ぐことができる

以降では内部の作りについて簡単に説明します。

バッファ管理とFluentdへの送信

既存の fluent/fluent-logger-java · GitHub だとイベント追加時に以下を行っています。

  • MessagePackシリアライズされたイベントのデータをメモリ上のバッファにMessageフォーマット ([tag, time, event]) として追加
  • Fluentdのin_forwardに対し、バッファ上のデータをTCPで送信
  • 成功したらバッファクリア、失敗したら次回イベント追加時にまとめて送信

ただ、この方法だとバッファリングされた複数のイベントを送信する際、ACKレスポンス対応が難しい (各イベント毎にACKを待つためバッファ上のMessagePackシリアライズされたデータをデシリアライズして個々のイベントを取得する必要がある) という問題がありました。また、PackedForwardフォーマットには簡単に対応できません。

そこでFluencyではPackedForwardフォーマットの場合、イベント追加時に以下のようにしています (Messageフォーマットはこれより単純なのでここではPackedForwardを例に...)。

f:id:komamitsu:20151207001354j:plain

  • tag毎にバッファを保持しているMap<String, Buffer>に、既に当該tag用のバッファが存在している場合
    • かつ、そのバッファにMessagePackシリアライズされたイベントのデータを書き込む空き容量がある場合は、そのバッファに追記
    • バッファに書き込む空き容量がない場合、指定されている割合でバッファを拡張して追記
  • そもそも当該tag用のバッファが存在しない場合、初期サイズでバッファを割り当てて、そこに書き込む
  • 一定サイズまで拡張されているバッファ、もしくは一定時間フラッシュされていないバッファを送信用のキューに移動 (Mapからはバッファは取り除かれる)

また、自前でバッファプールを持っており、上記処理においてバッファの取得の際はこのプールから取得することでバッファを再利用しています (バッファサイズ拡張時の旧バッファおよびFluentdへのデータ送信完了済みバッファがバッファプールに返却される) 。

Fluentdへのイベントデータ送信時はPackedForwardフォーマットの場合、前述した送信用のキューからtag付きのMessagePackedなchunk ([tag, MessagePacked chunk([[time, event], [time, event]], ...])]) を取り出して送信します。複数のイベントを一括で送信できる上、Fluentd側では受け取ったchunkをそのままemitできるので速度 / CPU負荷ともに効率的です。

同期送信モードの場合はイベント追加APIを呼び出したスレッドで、非同期送信モードの場合は専用スレッドにて上記のイベントデータ送信処理を行います。後者の場合、送信処理で詰まってもイベント追加API呼び出しではブロックされません。

ちなみに、Messageフォーマット ([tag, time, event]) の場合は各イベント毎に送信キューに入れています。PackedForwardに比べると、こちらはシンプルですので詳細は省略します。

Fluentdの死活監視

Fluentdのin_forwardに対しては、UDPまたはTCPにてheartbeatを投げ死活監視を行っています。

heartbeatの失敗 / 遅延に対する判断ロジックについては、元々Fluentdのout_forward <-> in_forward間で ϕ Accrual Failure Detector アルゴリズムが用いられていたため、これをJavaで実装した komamitsu/phi-accural-failure-detector · GitHub を使って実現しています。とはいえ、Fluentdでこのアルゴリズムを使うのはやり過ぎな感もあるため、今後よりシンプルなロジックに差し代わる可能性もありそうです。

また、heartbeatにより利用不可と判断された場合はfailoverとして、secondaryのFluentdを利用します。ただし、利用不可と判断されたprimaryのFluentdが復帰し一定時間経った場合は、再度そちらを利用します。

ACKレスポンス対応

前述したバッファ管理にて説明した通り、送信データ単位 (Messageフォーマットの場合: [tag, time, event], PackedForwardフォーマットの場合: [tag, MessagePacked chunk([[time, event], [time, event]], ...])]) で送信キューに入れていることにより、ACKレスポンスの対応が容易になっています。

具体的にはACKレスポンスモードの場合、各データの末尾にUUIDを付与しFluentdからのレスポンスに同値が含まれていることを確認するまでブロッキング (次のデータを送信しない) することでat least onceを実現しています。ACKを待つために性能への影響は当然0ではありませんが、大量のイベントをまとめて送信するPackedForwardフォーマットとの併用により、性能にそれほど影響を与えずかつat least onceを実現できるかと思います。

活用事例

Fluencyは既に大量のデータ送信が想定されている treasure-data/kafka-fluentd-consumer · GitHub に組み込まれたりしているようです。気軽に使える
fluent/fluent-logger-java · GitHub、タフな環境でも使える komamitsu/fluency · GitHub という使い分けは良さそうな気がするので、Fluentdのfailoverが必要であったり、大量のデータ送信が行われるような環境であれば是非お試し頂いてフィードバックを頂けたら幸いです。

今後の予定

リトライ機能や複数のFluentdを指定しfailoverすることが可能であるため、それ以外のエラーハンドリングには現状力を入れていませんでした。エラー通知用のコールバック等、機能追加していこうかと思います。また、ファイルバッファリングを実装しさらに冗長性を高めるのもありかなぁ...等々

PrestoのPlan

Prestoのコードを眺めていて、com.facebook.presto.execution.SqlQueryExecution#doAnalyzeQuery内で生成されるPlanを見てみたかったのでPrestoのJVMをアタッチしてPlanクラスの中を覗いてみた。

対象のクエリはこちら(catalogはtpch)。

select c.nationkey, count(1) from orders o 
join customer c on o.custkey = c.custkey
where o.orderpriority = '1-URGENT' group by c.nationkey

Planの結果はこれ。
f:id:komamitsu:20150405233413p:plain

Subplanを経由してのStageExecutionPlan:
f:id:komamitsu:20150426235803p:plain

MessagePack v07とJacksonの文字列を対象とした性能比較

        int cnt = 800000;

        {
            ObjectMapper mapper = new ObjectMapper();
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            List<String> strList = new ArrayList<String>(cnt);
            for (int i = 0; i < cnt; i++) {
                strList.add("01234567");
            }

            System.gc();
            Thread.sleep(3000);

            long start = System.currentTimeMillis();
            mapper.writeValue(outputStream, strList);
            System.out.println("Jackson(Serialize, String): " + (System.currentTimeMillis() - start) + "ms");

            System.gc();
            Thread.sleep(3000);

            start = System.currentTimeMillis();
            mapper.readValue(outputStream.toByteArray(), new TypeReference<List<String>>() {});
            System.out.println("Jackson(Deserialize, String): " + (System.currentTimeMillis() - start) + "ms");
        }

        {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            MessagePacker packer = org.msgpack.core.MessagePack.newDefaultPacker(outputStream);

            System.gc();
            Thread.sleep(3000);

            long start = System.currentTimeMillis();
            for (int i = 0; i < cnt; i++) {
                packer.packString("01234567");
            }
            packer.flush();

            System.out.println("MessagePack(Serialize, String): " + (System.currentTimeMillis() - start) + "ms");

            MessageUnpacker unpacker = org.msgpack.core.MessagePack.newDefaultUnpacker(outputStream.toByteArray());

            System.gc();
            Thread.sleep(3000);

            start = System.currentTimeMillis();
            for (int i = 0; i < cnt; i++) {
                unpacker.unpackString();
            }
            System.out.println("MessagePack(Deserialize, String): " + (System.currentTimeMillis() - start) + "ms");
        }

int cnt = 1000000;

Jackson(Serialize, String): 71ms
Jackson(Deserialize, String): 60ms
MessagePack(Serialize, String): 96ms
MessagePack(Deserialize, String): 82ms

int cnt = 1000000;

Jackson(Serialize, String): 103ms
Jackson(Deserialize, String): 246ms
MessagePack(Serialize, String): 192ms
MessagePack(Deserialize, String): 158ms

int cnt = 10000000;

Jackson(Serialize, String): 574ms
Jackson(Deserialize, String): 7259ms
MessagePack(Serialize, String): 1316ms
MessagePack(Deserialize, String): 1049ms

PostgreSQLのWITH RECURSIVE練習でフィボナッチ数列

PostgreSQLにはWITH RECURSIVE句というものがあって、これを使うと再帰的な問い合わせが可能らしい。で、一度も使ったことがなかったのでちょっと試しにフィボナッチ数列を生成してみた。

with recursive r(a, b) as (
  select 0::int, 1::int
  union all
  select b, a + b from r where b < 1000
)
select a from r;

  a
-----
   0
   1
   1
   2
   3
   5
   8
  13
  21
  34
  55
  89
 144
 233
 377
 610
 987
(17 rows)

RubyのEnumerable#injectやOCamlのList.fold_leftみたいに最初のSELECT文で初期値を生成、UNION ALL(またはUNION)の後ろに、accumulatorを出力するSELECT文を書けば良さげ(で、ここに終了条件も含む)。

ByteBufferへのコピー速度比較

JavaでByteBufferへのコピーを行う際の速度を簡単に比較してみました。java.nio.ByteBuffer#put(byte)で1byteずつ書き込んでいくのでなければ大体同じですね。

Heap上のbyte arrayをコピー

java.nio.ByteBuffer#put(byte[], int, int)

        byte[] bs = new byte[256 * 1024 * 1024];
        ByteBuffer src = ByteBuffer.wrap(bs);
        ByteBuffer dst = ByteBuffer.allocate(bs.length);
        long start = System.currentTimeMillis();


        dst.put(src.array(), 0, src.remaining());
        System.out.println("heap.byte_array: ByteBuffer.put(byte[], int, int): " + (System.currentTimeMillis() - start));

=> heap.byte_array: ByteBuffer.put(byte[], int, int): 73

java.nio.ByteBuffer#put(java.nio.ByteBuffer)

        dst.put(src);
        System.out.println("heap.byte_array: ByteBuffer.put(java.nio.ByteBuffer): " + (System.currentTimeMillis() - start));

=> heap.byte_array: ByteBuffer.put(java.nio.ByteBuffer): 69

java.nio.ByteBuffer#put(byte) with loop

        for (int i = 0; i < src.remaining(); i++)
            dst.put(src.get());
        System.out.println("heap.byte_array: ByteBuffer.put(byte): " + (System.currentTimeMillis() - start));

=> heap.byte_array: ByteBuffer.put(byte): 380

Heap上のByteBufferをコピー

java.nio.ByteBuffer#put(byte[], int, int)

        ByteBuffer src = ByteBuffer.allocate(256 * 1024 * 1024);
        ByteBuffer dst = ByteBuffer.allocate(256 * 1024 * 1024);
        long start = System.currentTimeMillis();

        dst.put(src.array(), 0, src.remaining());
        System.out.println("heap.bb: ByteBuffer.put(byte[], int, int): " + (System.currentTimeMillis() - start));

=> heap.bb: ByteBuffer.put(byte[], int, int): 72

java.nio.ByteBuffer#put(java.nio.ByteBuffer)

        dst.put(src);
        System.out.println("heap.bb: ByteBuffer.put(java.nio.ByteBuffer): " + (System.currentTimeMillis() - start));

=> heap.bb: ByteBuffer.put(java.nio.ByteBuffer): 70

java.nio.ByteBuffer#put(byte) with loop

        for (int i = 0; i < src.remaining(); i++)
            dst.put(src.get());
        System.out.println("heap.bb: ByteBuffer.put(byte): " + (System.currentTimeMillis() - start));

Off heap上のByteBufferをコピー

java.nio.ByteBuffer#put(byte[], int, int)

        ByteBuffer src = ByteBuffer.allocateDirect(256 * 1024 * 1024);
        ByteBuffer dst = ByteBuffer.allocate(256 * 1024 * 1024);
        long start = System.currentTimeMillis();

        dst.put(src.array(), 0, src.remaining());
        System.out.println("off-heap.bb: ByteBuffer.put(byte[], int, int): " + (System.currentTimeMillis() - start));

=> java.lang.UnsupportedOperationException (it doesn't have array inside)

java.nio.ByteBuffer#put(java.nio.ByteBuffer)

        dst.put(src);
        System.out.println("off-heap.bb: ByteBuffer.put(java.nio.ByteBuffer): " + (System.currentTimeMillis() - start));

=> off-heap.bb: ByteBuffer.put(java.nio.ByteBuffer): 72

java.nio.ByteBuffer#put(byte) with loop

        for (int i = 0; i < src.remaining(); i++)
            dst.put(src.get());
        System.out.println("off-heap.bb: ByteBuffer.put(byte): " + (System.currentTimeMillis() - start));

=> off-heap.bb: ByteBuffer.put(byte): 358