CUDA10.2 の Virtual Memory Management 機能を試してみる

11月21日にリリースされた CUDA 10.2 の Release Note を読んでみると、さらっと『Added support for CUDA Virtual Memory Management APIs.』という一文が。

以前から、ManagedなGPUバイスメモリをマルチプロセスで共有できるようにしてほしいと、機能リクエストを上げていた事もあり、これがそれに該当するのかね?と、少し手を動かして試してみた。

忙しい人のために、先に結論だけ書くと、現状のCUDA Toolkit 10.2の対応範囲では、既存のcuIpcGetMemHandle()cuIpcOpenMemHandle()で実現できる機能に差はないものの、APIの仕様・拡張可能性としては、将来的にManaged Device Memoryをマルチプロセスで共有するための布石であるようにも見えるので、引き続き注視していきたい。

なお、試行錯誤の跡(サンプルプログラム)はこちら。
https://github.com/heterodb/toybox/tree/master/gpu_mmap

背景 - なぜManaged Device Memoryをマルチプロセスで共有したいか

JOIN処理をGpuJoinで実行する場合、基本的には (N-1) 個の比較的小さな(GB単位の)テーブルと、1個の巨大なテーブル(数十GB以上~)との結合というのが基本線になる。これはマスタ表とファクト表の結合というのを念頭に置いた設計であるが、説明の簡単のため、左の小さいテーブル群をInner側、右の大きなテーブルをOuter側と呼ぶ。

GpuJoinが内部的に使っている Hash Join のアルゴリズム上、Outer側を読み出しながら少しずつ処理を進めていく事はできるのだが、少なくとも Inner 側はオンメモリに、しかもGPUを使用するのでデバイスメモリに載せておく必要がある。*1

そうすると、Inner側でオンメモリにできるデータサイズは自ずとGPUの搭載メモリ容量に規定されてしまうという事がまず前提としてある。難しいのは、ここでInner側のバッファに載せる必要があるのは、Inner側テーブルを条件句付きでスキャンした結果であるので、実行計画では1,000行だと思っていたのに、実際にデータを読んでみたら百万行ありましたという可能性もある事。つまり、必ずしもオプティマイザで弾ける訳ではない。

で、もう一つ考慮すべき事項として、PostgreSQL v9.6で並列クエリがサポートされた事。
これにより、通常のバックエンドプロセスに加えて、ワーカープロセスが、並列に全く同一のGpuJoinを実行する可能性を考慮する必要が出てきたが、これを CUDA から見ると、互いに独立な複数のプロセスがGPUメモリを奪い合いながらGPU kernelを動かしているように見える。

はっきり言えば、デバイスメモリ制限の厳しい GpuJoin ワークロードにとっては最悪である。並列で動作する各プロセス毎にデバイスメモリを確保に走れば、あっという間にデバイスメモリが枯渇してしまわないとも限らない。
そこで現在のPG-Stromでは、Inner側バッファの内容を一個だけ作り、それをcuIpcGetMemHandle()cuIpcOpenMemHandle()を使って各ワーカープロセスと共有する。Inner側バッファの内容は一度作ってしまえばRead-Onlyなので、デバイスメモリの消費量はほぼ変わらない。

ただし、これらIPC系の機能を使えるのはcuMemAllocで獲得したメモリ、即ち、cuMemAlloc呼び出しの時点で実際にGPUバイスメモリの物理ページが割り当てられ、決してPage-outしない代わりにOver-subscriptionも不可という制限がある。
どういう事かというと、デバイスメモリの Over-subscription が不能であるために『あと数十MBあればInner側バッファをデバイスメモリに載せられたのに』という状況で涙を飲むしかなくなる。

一方、これが Managed Device Memory と呼ばれる領域であれば、アロケーション時点では仮想アドレス空間だけを確保しておき、プログラムが実際にその領域をアクセスした時点でページを割り当て、必要があればホストメモリ上のバッファとスワップする事で処理を継続する事もできる。

kaigai.hatenablog.com

が、その場合、前述のIPC系機能は対応していないため、マルチプロセス下でGpuJoinのInner側バッファとして利用する場合には、同じデータを重複して何ヶ所にも持つ事が必要となり無駄が多い。あっちを立てれば、こっちが立たず。

全体的な流れ

CUDA Driver APIVirtual Memory Managementの章を読んでみると、全体的な流れは以下のような形である事がわかる。

まず、デバイスメモリをエクスポートする側の流れ:

  1. cuMemCreate()を用いて、memory allocation handleを作成する。この人が物理的に割り当てられたメモリ領域の実体になる。
    • OSで言えば、shm_open(3)fallocate(2)みたいな感じ
  2. cuMemExportToShareableHandle()を用いて、memory allocation handleを指し示すユニークな識別子であるshareable-handleに変換する。
    • shareable-handleはOS種類によって実体が異なっていて、Linuxであればfile-descriptorが、WindowsであればHANDLEだと記載がある。
    • OSで言えば、shm_open(3)した時のパス名を取り出すようなAPIとでも言うべきか。

次に、デバイスメモリをインポートする側の流れ

  1. cuMemImportFromShareableHandle()を使って、上記のshareable-handleを、ローカルなmemory allocation handleに変換する。
    • OSで言えば、他のプロセスから渡された共有メモリのパス名を使ってopen(2)するようなもの。
  2. cuMemAddressReserve()を使って、このデバイスメモリをマッピングする仮想アドレス領域を予めリザーブしておく。
  3. cuMemMap()を使って、手順(1)で取得した memory allocation handle を、手順(2)で確保したアドレス空間にマップする。
  4. cuMemSetAccess()を使って、上記アドレス空間のアクセス許可を変更する。Read-onlyとRead-Writableの2種類が設定可
    • OSで言えば、上記の3ステップはmmap(2)が一度に実行する事になる。

なお、cuMemCreate()を使ってデバイスメモリを作成したプロセス自身がそれをマップして使いたい場合は、cuMemExportToShareableHandle()cuMemImportFromShareableHandle()を使うまでもなく、ローカルのmemory allocation handleを持っているので、単純にcuMemAddressReserve()cuMemMap()cuMemSetAccess()を呼び出せばよい。

注目すべきAPI:cuMemCreate()

gpu_mmap.c:L304で呼び出しているcuMemCreateの定義は以下のようになっている。

CUresult cuMemCreate ( CUmemGenericAllocationHandle* handle, size_t size, const CUmemAllocationProp* prop, unsigned long long flags);

第一引数は処理結果の memory allocation handle を返すポインタで、第二引数はサイズ。第三引数に、どういったメモリが欲しいのかという属性をセットする。

CUmemAllocationProp構造体の定義は以下のようになっており、Linuxで関係するのは最初の3つだけ。

typedef struct CUmemAllocationProp_st {
    /** Allocation type */
    CUmemAllocationType type;
    /** requested ::CUmemAllocationHandleType */
    CUmemAllocationHandleType requestedHandleTypes;
    /** Location of allocation */
    CUmemLocation location;
    /**
     * Windows-specific LPSECURITYATTRIBUTES required when
     * ::CU_MEM_HANDLE_TYPE_WIN32 is specified.  This security attribute defines
     * the scope of which exported allocations may be tranferred to other
     * processes.  In all other cases, this field is required to be zero.
     */
    void *win32HandleMetaData;
    /** Reserved for future use, must be zero */
    unsigned long long reserved;
} CUmemAllocationProp;

で、CUmemAllocationTypeを定義を確認してみると、CUDA 10.2時点で有効なのはCU_MEM_ALLOCATION_TYPE_PINNEDのみ。コメントを読むと、要はcuMemAllocと同じようにふるまうという事である。*2

typedef enum CUmemAllocationType_enum {
    CU_MEM_ALLOCATION_TYPE_INVALID = 0x0,

    /** This allocation type is 'pinned', i.e. cannot migrate from its current
      * location while the application is actively using it
      */
    CU_MEM_ALLOCATION_TYPE_PINNED  = 0x1,
    CU_MEM_ALLOCATION_TYPE_MAX     = 0xFFFFFFFF
} CUmemAllocationType;

次にCUmemAllocationHandleTypeを確認すると、これは、OS上で交換可能な shareable-handle をどのように表現するかという話で、LinuxであればCU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTORを選択し、識別子はintの幅を持つとされる。

typedef enum CUmemAllocationHandleType_enum {
    /**< Allows a file descriptor to be used for exporting. Permitted only on POSIX systems. (int) */
    CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR = 0x1,
    /**< Allows a Win32 NT handle to be used for exporting. (HANDLE) */
    CU_MEM_HANDLE_TYPE_WIN32                 = 0x2,
    /**< Allows a Win32 KMT handle to be used for exporting. (D3DKMT_HANDLE) */
    CU_MEM_HANDLE_TYPE_WIN32_KMT             = 0x4,
    CU_MEM_HANDLE_TYPE_MAX                   = 0xFFFFFFFF
} CUmemAllocationHandleType;

三番目のCUmemLocationの定義は以下の通り。要はCU_MEM_LOCATION_TYPE_DEVICEを指定して、何番目のGPUでメモリを確保しますかという事になり、現状ではデバイスメモリ以外の選択肢はない。

typedef enum CUmemLocationType_enum {
    CU_MEM_LOCATION_TYPE_INVALID = 0x0,
    /**< Location is a device location, thus id is a device ordinal */
    CU_MEM_LOCATION_TYPE_DEVICE  = 0x1,
    CU_MEM_LOCATION_TYPE_MAX     = 0xFFFFFFFF
} CUmemLocationType;

typedef struct CUmemLocation_st {
    /**< Specifies the location type, which modifies the meaning of id. */
    CUmemLocationType type;
    /**< identifier for a given this location's ::CUmemLocationType. */
    int id;
} CUmemLocation;

今回、新たに追加されたAPIではあるが、こうして見てみるとCUmemAllocationPropのフラグを変えるだけで、将来的にはManaged Device Memoryのエクスポートとマルチプロセスでの共有も可能にできるようなAPIの切り方をしている。

それを強く感じるのが、次のcuMemAddressReserveである。

注目すべきAPI:cuMemAddressReserve()

CUDAのManaged Memoryのもう一つ大きな特徴として、Over-subscriptionが可能である以外にも、Host側とDevice側で同一の仮想アドレスを利用して、Page Faultをうまくハンドリングする事で、物理ページがHost側なのかDevice側なのか意識せずに(裏でデータ転送が動いている)プログラムを書けるという事があった。

しかし、これをマルチプロセスに拡張するとなると、あるプロセスAでcuMemAllocManagedを呼び出して確保したメモリの仮想アドレスが、他のプロセスBでは既に別の用途に利用されており、そうすると「Host側とDevice側で同一の仮想アドレス」という前提が損なわれるという問題があった。

おそらく、cuMemMapに先立って、わざわざ仮想アドレス空間を予約するAPIを呼び出させるという事は、『Managed Device Memoryをマルチプロセスで共有できるようになっても、当該メモリを共有する全てのプロセスで同じ仮想アドレスに揃うよう保証する事はできないので、アプリケーション側で予め仮想アドレス空間を予約して他が使えないように抑えておいてくれ』というNVIDIAからのメッセージではないだろうか?

OS上の共有メモリでも、別プロセスに貼り付けた時に同一の仮想アドレスを取れるようPROT_NONEmmapしておくというのはちょくちょく使うテクニックなので、まぁ、そういう事なんだろうと思う。*3
kaigai.hatenablog.com

注目すべきAPI:cuMemImportFromShareableHandle()

コレは注目すべきというか、注意すべきAPI

Linuxの場合、shareable-handleとして使われるのは file descriptor と確かに書いてあったが、当初、cuMemExportToShareableHandleで返ってきた値を、そのまま別プロセスでcuMemImportFromShareableHandleに渡せばよいと思って2~3時間ハマった(汗

これは当然と言えば当然だが、エクスポート元のプロセスにとっての file-descriptor = XX は、エクスポート先のプロセスにとっての file-descriptor = XX ではないので、UNIXドメインソケットとsendmsg(2)/recvmsg(2)にSCM_RIGHTSを使って、file-descriptorの複製を作ってやる必要がある。

gpu_mmap.c:L98以降の一連の処理は、その辺のファイルディスクリプタの受け渡しに関するもの。
折角なので、バッファのサイズとGPUのデバイスIDをメッセージに載せて渡すようにしている。

注目すべきAPI:cuMemSetAccess()

cuMemMapでデバイスメモリをマップするだけでは、この領域に対するアクセス権が無いのでCUDA_ERROR_ILLEGAL_ADDRESSエラーになってしまう。
なので、CU_MEM_ACCESS_FLAGS_PROT_READCU_MEM_ACCESS_FLAGS_PROT_READWRITEを指定する必要がある。

当初、Over-subscriptionが使えないなら、せめてInner側バッファをread-writableで作成し、その後read-onlyに変えるようにしたら、不慮の事故でメモリ破壊を防げる的な利点はあるかと思ったが…。

↓インポート側プロセスでCU_MEM_ACCESS_FLAGS_PROT_READを指定した場合。

[kaigai@magro gpu_mmap]$ ./gpu_mmap
total sum [master]: 6420001848.309442
gpu_mmap.c:259  failed on cuMemSetAccess: CUDA_ERROR_INVALID_VALUE
gpu_mmap.c:259  failed on cuMemSetAccess: CUDA_ERROR_INVALID_VALUE

CU_MEM_ACCESS_FLAGS_PROT_READWRITEを指定しないと怒られました。Orz...

結論

サンプルプログラムは、エクスポート側プロセスでデバイスメモリを確保し、それを(なんちゃって)ランダムな値で埋めたあと、インポート側プロセスでそれをマップし、それぞれ合計値を計算するだけのGPUカーネルを実行するというもの。

このように、明示的なデータのコピーは無いものの、同じメモリ領域に対して同じ集計演算を実行した結果、同じ結果を出力している。(浮動小数点同士の加算なので、集計のタイミングによって多少の誤差が出るのは仕方ない)

[kaigai@magro gpu_mmap]$ ./gpu_mmap
total sum [master]: 6420001848.309442
total sum [17767]: 6420001848.309443
total sum [17768]: 6420001848.309442

これにより、CUDA 10.2で追加されたVirtual Memory Management機能により、GPUのデバイスメモリを複数のプロセスで共有できる事が確認できた。

駄菓子菓子。現状では Managed Device Memory に対応しておらず、できる事は既存のcuMemAlloccuIpcGetMemHandle と変わらない。ファイルディスクリプタの受け渡しなど、面倒な要素はあるので、今の時点で積極的に利用する必要性はない。

ただし、おそらく今後の CUDA Toolkit のバージョンアップの中で、どこかのタイミングで Managed Device Memory への対応が(ひっそりと)行われるであろう。その時になって、慌ててアプリケーションの大規模な改修を行わずに済むよう、頭の片隅に当該API群の事を留め置いてもよいだろう。

*1:厳密には必ずしも真ではないが、その場合の性能ペナルティが激しいので、PG-Stromでは選択肢とはしていない。

*2:ここで、『おぅ、マジか…。』となる

*3:なお、下記のエントリの仕組みは今では使っていませんが、PG-Stromの deadcode/ 以下に当時の残骸が残っています

Billion rows processed per second at a single-node PostgreSQL

I have worked on benchmarking of PG-Strom at a large hardware configuration for a couple of months.
Due to the server models we had, our benchmark results had been usually measured at a small 1U rack server with 1CPU, 1GPU and 3-4 NVME-SSDs, even though PG-Strom supports multi-GPUs and striping of NVME-SSDs.
So, we determined to arrange more powerful hardware environment for a validation of maximum performance of PG-Strom a few months before. After that, we could confirm billion-rows processed per second performance at a single-node PostgreSQL system using Star Schema Benchmark (SSBM), as we have expected.
This article introduces the technology briefs, benchmark results and expected applications.

Hardware configuration for the benchmark

The diagram below shows the hardware configuration we prepared for the benchmark project.

The 4U-rack server (Supermicro SYS-4029GP-TRT) is a certified model for NVIDIA Tesla V100 GPUs *1, and optimized for NVIDIA GPUDirect RDMA by PCIe-switch.
PCIe-switch has two purpose here. It enables to install more device than host system’s PCIe lanes *2, and also enables to bypass CPU on peer-to-peer data transfer between PCIe devices under the PCIe-switch.
It is an ideal configuration for us, because PG-Strom fully utilize the P2P data transfer on SSD-to-GPU Direct SQL mode, and it is the key of performance.
U.2 NVME-SSD drives (Intel DC P4510) are installed on the external JBOF enclosures, and connected to the host system using PCIe host cards and direct attach cables *3. This host card is designed to install on PCIe x16 slot, and capable to connect four NVME-SSD drives that support PCIe x4 bandwidth. So, data transfer bandwidth over the PCIe-bus shall balance on 1xGPU and 4xSSDs.
Below is the block diagram of our benchmark environment.


Element technology① - SSD-to-GPU Direct SQL

It is a characteristic feature of PG-Strom. By co-operation with a special Linux kernel driver (nvme_strom) that intermediates peer-to-peer DMA from NVME-SSD to GPU over PCIe-bus, PG-Strom can directly load PostgreSQL data blocks on NVME-SSD drives onto GPU’s device memory, but CPU/RAM is bypassed.

Usually, software cannot determine which items are necessary and which ones are junks, prior to data blocks are loaded to system RAM. On the other word, we consume I/O bandwidth and CPU cycles to copy junk data.
SSD-to-GPU Direct SQL changes the data flow. GPU pre-processes SQL workloads on the middle of I/O path to eliminate unnecessary rows and runs JOIN/GROUP BY steps. So, CPU/RAM eventually receives just a small portion of the result set from the GPU.

Element technology② - Arrow_Fdw

Apache Arrow is a column-oriented data format for structured data-set, and many applications (especially, big-data and data analytics area) support it for data exchange. PG-Strom supports to read Apache Arrow files as its columnar store, in addition to PostgreSQL’s row data blocks.

Due to the nature of data format, columnar-data enables to read only referenced columns, unlike row-data. It usually reduce amount of I/O to be read from the storage.
Arrow_Fdw is designed for direct read from Apache Arrow files without data importing to database system. It means we can eliminate one time-consuming steps, if conprehensive software generates imput data in Apache Arrow format. It is a suitable characteristic for IoT/M2M log processing system that usually generates tons of data to be loaded to data processing phase.

Star Schema Benchmark and PostgreSQL partition

As our usual, we used Star Schema Benchmark (SSBM) for performance measurement.
Its scaling factor of lineorder is SF=4000, then it shall be distributed to four partition-leafs using hash-partition of PostgreSQL.
So, individual partition-leafs has about 879GB (6 billion rows) row-data, and total size of lineorder is about 3.5TB.

We also set up equivalent Apache Arrow files for each partition leaf, and set up one another partition table that is consists of foreign-tables on Arrow_Fdw.

SSBM defines 13 queries that containes a scan on lineorder that is the largest portion and JOIN / GROUP BY. For example, the query below is Q2_3.

select sum(lo_revenue), d_year, p_brand1
  from lineorder, date1, part, supplier
  where lo_orderdate = d_datekey
    and lo_partkey = p_partkey
    and lo_suppkey = s_suppkey
     and p_brand1 = 'MFGR#2221'
     and s_region = 'EUROPE'
  group by d_year, p_brand1
  order by d_year, p_brand1;

Benchmark Results

Below is the benchmark result. The vertical axis means number of rows processed per second.
Its definition is (number of lineorder; 24billion rows) / (SQL response time).
For example, PG-Strom + Arrow_Fdw responds the result of Q1_2 within 14.0s, so it means 1.71 billion rows were processed per second.

The blue bar is PostgreSQL v11.5 with a manual tuning to launch 24 parallel workers *4. Its performance was about 50-60 million rows per second.

The orange bar is PG-Strom on row-data of PostgreSQL by SSD-to-GPU Direct SQL. Its performance was about 250 million rows per second.

These two series has little variation over the 13 queries, because i/o dominates the overall workloads rather than JOIN / GROUP BY, thus equivalent amount of storage i/o led almost equivalent performance.

The green bar is PG-Strom + Arrow_Fdw with SSD-to-GPU Direct SQL, is the topmost performance.
Due to the nature of columnar data, query performance was variable according to the number of referenced columns.
Regarding of the group of Q1_* and Q2_*, we could validate the performance more than billion rows processed per second.

The graph above is time-seriesed result of iostat.
It shows PG-Strom could load the data from 16x NVME-SSD drives in 40GB/s during the query execution.
Here are 13 mountains because SSBM defines 13 queries.

Conclusion

This benchmark results shows that a proper configuration of hardware and software can process reporting and analytics queries on terabytes grade data-set more than billion rows per second performance.
This grade of performance will change our assumption towards system architecture to process "big-data".
People have usually applied cluster system without any choice, however, accelerated PostgreSQL with GPU + NVME can be an alternative choice for them.

We think our primary target is log-data processing at IoT/M2M area where many devices generate data day-by-day. The raw data is usually huge for visualization or machine-learning, so needs to be summarized first prior to other valuable tasks.
A single node configuration makes system administration much simpler than cluster-based solution, and PostgreSQL is a long-standing software thus many engineers are already familiar with.

Appendix - hardware components.

This parts list is just for your reference

Parts Qty
model Supermicro SYS-4029GP-TRT 1
CPU Intel Xeon Gold 6226 (12C, 2.7GHz) 2
RAM 16GB DIMM (DDR4-2933; ECC) 12
GPU NVIDIA Tesla V100 (PCI-E; 16GB) 2
GPU NVIDIA Tesla V100 (PCI-E; 32GB) 2
HDD Seagate 2.5inch 1.0TB/2.0TB 4
JBOF SerialCables PCI-ENC8G-08A 2
SSD Intel DC P4510 (U.2; 1.0TB) 16
HBA SerialCables PCI-AD-x16HE-M 4

*1:This model is capable to install up to 8 GPUs

*2:Xeon Gold 6226 [Cascade Lake] has 48lanes

*3:Supermicro does not support to replace the internal storage backplane to the product that support NVME.

*4:10 workers were launched in the default, so it means 2-3 workers per partition were assigned. It is too small and cannot pull out i/o capability because CPU usage ratio was 100% during the execution.

秒速で10億レコードを処理する話

これまでのPG-Stromの性能測定といえば、自社保有機材の関係もあり、基本的には1Uラックサーバに1CPU、1GPU、3~4台のNVME-SSDを載せた構成のハードウェアが中心だった。*1
ただソフトウェア的にはマルチGPUやNVME-SSDのストライピングに対応しており、能力的にどこまで伸ばせるのかというのは気になるところである。
そこで、方々に手を尽くして、次のようなベンチマーク環境を整備してみた。
(機材をお貸し頂いたパートナー様には感謝感激雨あられである)

4UサーバのSYS-4029GP-TRTというモデルは、GPUをたくさん乗っけるためにPCIeスイッチを用いてPCIeスロットを分岐している。ちょうど、PCIeスイッチ1個あたり2個のPCIe x16スロットが用意されており、同じPCIeスイッチ配下のデバイス同士であれば、完全にCPUをバイパスしてPeer-to-Peerのデータ転送ができる。Supermicro自身も、このサーバを"GPUDirect RDMAに最適化"したモデルとして売っている。

こういう構造のサーバであるので、P2P DMAを用いてSSDからGPUへデータを転送する場合、PCIeスイッチの配下にある2本のPCIeスロットにそれぞれSSDGPUをペアにして装着すると、データ転送の時に効率が良い。

U.2のNVME-SSDを装着するには外部のエンクロージャが必要で、今回は(色々あって)SerialCables社のPCI-ENC8G-08Aという製品を使用した。これはエンクロージャあたり8本のU.2 NVME-SSDを装着する事ができ、ダイレクトアタッチケーブルを使用して各2枚のPCIeホストカード(PCI-AD-x16HE-M)と接続する。

そうすると、GPU 1台あたりU.2 NVME-SSDを4台のペアを構成する事ができ、それぞれPCIe Gen3.0 x16レーン幅の帯域でCPUをバイパスして直結できるようになる。
ブロックダイアグラムにして書き直すと以下の通り。


Star Schema Benchmarkとデータセット

性能評価に使ったのはいつもの Star Schema Benchmark(SSBM) で、SF=4000で作ったデータセットPostgreSQLのHashパーティショニング機能を使って4つのユニットにデータを分散させる。つまり、U.2 NVME-SSDを4台あたりSF=1000相当の規模のデータ(60億件、879GB)を持つことになる。

さらにデータの持ち方も、PostgreSQLの行形式に加えて、PG-Stromの列ストアであるApache Arrow形式で全く同じ内容のファイルを用意して、各パーティションへ配置した。

Apache Arrow形式というのは構造化データを列形式で保存・交換するためのフォーマットで、PG-StromにおいてはArrow_Fdw機能を用いての直接読み出し(SSD-to-GPU Direct SQLを含む)に対応している。詳しくはこちらのエントリーなど。

kaigai.hatenablog.com

SSBMには全部で13種類のクエリが含まれており、例えば、Q2_3のクエリは以下の通り。
データ量の多いlineorderのスキャンと同時に、他のテーブルとのJOINやGROUP BYを含むワークロードである。

select sum(lo_revenue), d_year, p_brand1
  from lineorder, date1, part, supplier
  where lo_orderdate = d_datekey
    and lo_partkey = p_partkey
    and lo_suppkey = s_suppkey
     and p_brand1 = 'MFGR#2221'
     and s_region = 'EUROPE'
  group by d_year, p_brand1
  order by d_year, p_brand1;

ベンチマーク結果

という訳で、早速ベンチマーク結果を以下に。
縦軸は単位時間あたりの処理行数で、(lineorderの行数;240億行)÷(SQL応答時間)で計算している。例えば PG-Strom 2.2 + Arrow_Fdw における Q1_2 の応答時間は 14.0s なので、毎秒17.1億行を処理しているということになる。

青軸は PostgreSQL v11.5 で、並列クエリ数を24に引き上げるというチューニング*2を行っている。性能値としては、毎秒50~60百万行の水準。

オレンジの軸は、PostgreSQLの行データに対して PG-Strom v2.2 のSSD-to-GPU Direct SQLを使用してクエリを実行したもの。性能値としては、毎秒250百万行前後の水準。

この二つに関しては、13個のクエリの間で性能値に大きな傾向の差がない。これは、JOIN/GROUP BYの処理負荷よりも、まずI/Oの負荷が支配項になっており、行データである限りは参照されていない列も含めてストレージから読み出さねばならないためだと考えられる。

緑の軸が真打で、PG-Strom v2.2のSSD-to-GPU Direct SQLをArrow_Fdw管理下の列ストアに対して実行したもの。
これは列データなので、参照するカラムの数によって大きく性能値の傾向が違っている様子が見えるが、Q1_*のグループ、Q2_*のグループに関しては、目標としていた毎秒10億行の処理能力を実証できたことになる。

一応、4xGPU + 16xNVME-SSD できちんとI/Oの性能が出ているという事を確認するために、クエリ実行中の iostat の結果を積み上げグラフにしてみた。山が13個あるのはクエリを13回実行したという事で、物理的には概ね40GB/sのSeqRead性能が出ている事がわかる。(つまり、クエリ応答性能の違いは参照している列の数による、という事でもある)

参考までに、今回の構成は以下の通り。

型番 数量
model Supermicro SYS-4029GP-TRT 1
CPU Intel Xeon Gold 6226 (12C, 2.7GHz) 2
RAM 16GB (DDR4-2933; ECC) 12
GPU NVIDIA Tesla V100 (PCI-E; 16GB) 2
GPU NVIDIA Tesla V100 (PCI-E; 32GB) 2
HDD Seagate 2.5inch 1.0TB/2.0TB 4
JBOF SerialCables PCI-ENC8G-08A 2
SSD Intel DC P4510 (U.2; 1.0TB) 16
HBA SerialCables PCI-AD-x16HE-M 4

ご自分でも環境を作ってみたいという方はご参考に。

PostgreSQL Conference Japan 2019

今回の一連の検証結果に関しては、来る11月15日(金)に品川駅前(AP品川)で開催予定の PostgreSQL Conference Japan 2019 にて『PostgreSQLだってビッグデータ処理したい!! ~GPUとNVMEを駆使して毎秒10億レコードを処理する技術~』と題して発表を行います。
有償でのカンファレンスではありますが、私の発表の他にも、経験豊富なPostgreSQLエンジニアによる14のセッション/チュートリアルが予定されており、ぜひご参加いただければと思います。

www.postgresql.jp

*1:例外としては、PGconf.ASIA 2018などに向けて、NEC様の協力でExpEtherを3台、3xGPU + 6xSSDの構成を作って13.5GB/sを記録したもの。

*2:デフォルトだとnworkers=10程度、すなわちパーティションあたり2~3のワーカーとなり、CPU100%で貼り付いてしまうため

Asymmetric Partition-wise JOIN

久々に PostgreSQL 本体機能へのパッチを投げたので、それの解説をしてみる。

PostgreSQL: Re: Asymmetric partition-wise JOIN

背景:Partition-wise JOIN

PostgreSQLパーティションを使ったときに、全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いた場合に限り、パーティション子テーブル同士のJOINを先に行い、その結果を出力するという機能がある。

以下の例では、ptableおよびstableは、それぞれ列distの値を用いたHashパーティションを設定しており、3つずつの子テーブルを持つ。

postgres=# \d
                List of relations
 Schema |   Name    |       Type        | Owner
--------+-----------+-------------------+--------
 public | ptable    | partitioned table | kaigai
 public | ptable_p0 | table             | kaigai
 public | ptable_p1 | table             | kaigai
 public | ptable_p2 | table             | kaigai
 public | stable    | partitioned table | kaigai
 public | stable_p0 | table             | kaigai
 public | stable_p1 | table             | kaigai
 public | stable_p2 | table             | kaigai
(8 rows)

これをJOINする時の実行計画は以下の通り。

postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Hash Join  (cost=360.00..24617.00 rows=10000 width=49)
   Hash Cond: (p.dist = s.dist)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=235.00..235.00 rows=10000 width=37)
         ->  Append  (cost=0.00..235.00 rows=10000 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(11 rows)

おっとっと。
この場合、ptableパーティションの子テーブル3つと、stableパーティションの子テーブル3つをそれぞれ全て読み出し、メモリ上で結合した結果同士をHash Joinで結合する事がわかる。

Partition-wise JOINはデフォルトでは off になっているので、enable_partitionwise_joinパラメータをonにセットして明示的に有効化する必要がある。

postgres=# set enable_partitionwise_join = on;
SET
postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Append  (cost=101.73..19617.00 rows=10000 width=49)
   ->  Hash Join  (cost=101.73..6518.87 rows=3277 width=49)
         Hash Cond: (p.dist = s.dist)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=60.77..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
   ->  Hash Join  (cost=104.80..6527.08 rows=3369 width=49)
         Hash Cond: (p_1.dist = s_1.dist)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=62.69..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
   ->  Hash Join  (cost=103.47..6521.06 rows=3354 width=49)
         Hash Cond: (p_2.dist = s_2.dist)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=61.54..61.54 rows=3354 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(16 rows)

同じクエリに対して、set enable_partitionwise_join = onを設定した状態で実行計画を作らせると、対応するパーティションの子テーブル同士でJOINを行い、その次にAppend、つまり各パーティション子要素の処理結果を結合している事が分かる。

これはJOINの問題サイズを小さくし、多くの場合、Append処理に要するCPUサイクルを減らすことのできる優れた最適化テクニックではあるが、かなりの慎重さを以ってDB設計を行う必要がある。
なぜなら『全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いる』事ができるよう、パーティション設計とSQLクエリの書き方を考える必要があるからである。

新機能:Asymmetric Partition-wise JOIN

同じようなノリで、非パーティションテーブルとパーティションテーブルの間のJOIN処理を、Appendの前に移動する事ができる。
例えば、非パーティションテーブルであるt1ptableとの間のJOINを (t1 \times ptable)と記述すると、これは (t1 \times ptable_p0) + (t1 \times ptable_p1) + (t1 \times ptable_p2)と展開する事ができる。

言い換えれば、t1を各パーティション子テーブルに分配する(クエリの書き換えに相当)事で、Appendよりも先にJOINを実行する事ができる。
もちろん、メリット・デメリットはあるので、先にパーティション子テーブルの内容を全て読み出した後でJOINを実行した方が良いというケースもある。

具体的に Asymmetric Partition-wise JOIN が活きてくるケースというのは

  • パーティション側のテーブルが、読み出しコストを無視できる程度に小さい。
    • 分配された分、複数回の読み出しが必要となってしまうため。
  • 子テーブルとのJOINにより、かなりの比率で行数が減ることが見込まれる。
    • 行数が減らなければ、Append処理は楽にならない。

というケースなので、あらゆる場合で効果があるかというと、そのような銀の弾丸はない。

では試してみることにする。
本来、以下のような実行計画となったいたクエリであるが・・・

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Hash Join  (cost=2.12..24658.62 rows=49950 width=49)
   Hash Cond: (p.a = t1.aid)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=1.50..1.50 rows=50 width=37)
         ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(8 rows)

同様に enable_partitionwise_join パラメータによって当該機能は有効化され、以下のような『賢い』実行計画を生成するようになる。

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Append  (cost=2.12..19912.62 rows=49950 width=49)
   ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
         Hash Cond: (p.a = t1.aid)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
         Hash Cond: (p_1.a = t1.aid)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
         Hash Cond: (p_2.a = t1.aid)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(16 rows)

t1テーブルが各パーティション子テーブル毎に読み出され、その結果を Append によって結合している事がお分かりだろうか。
しかも、最初のケースでは Append は 100万行を処理しなければならないところ、後者ではHash Joinによって大幅に行数が削られているため、たった49950行(推定値)しか処理しない事になっている。これに伴う推定コストの低下によって、Asymmetric Partition-wise JOINを使った実行計画が選択された。

ちなみに、結合すべきテーブルを増やしても、(コスト的に見合えば)各パーティション子テーブルへと分配してくれる。

postgres=# explain select * from ptable p, t1, t2 where p.a = t1.aid and p.b = t2.bid;
                                       QUERY PLAN
----------------------------------------------------------------------------------------
 Append  (cost=4.25..19893.99 rows=2495 width=86)
   ->  Hash Join  (cost=4.25..6625.83 rows=832 width=86)
         Hash Cond: (p.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
               Hash Cond: (p.a = t1.aid)
               ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6630.20 rows=832 width=86)
         Hash Cond: (p_1.b = t2.bid)
         ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
               Hash Cond: (p_1.a = t1.aid)
               ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6625.48 rows=831 width=86)
         Hash Cond: (p_2.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
               Hash Cond: (p_2.a = t1.aid)
               ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
(28 rows)

PG-Stromとの関連

この機能は、もちろんピュアなPostgreSQLで大量データを扱うときに効果を発揮するものである。

ただそれ以上に、PG-Stromのように『データが物理的にどこに存在するのか』を気にするソリューションだと更に大きな恩恵がある。

例えば、日々大量に溜まっていくログデータを、パーティション分割によって複数のディスクに物理的に分割格納し、集計・解析時にはこれを他のテーブルとJOIN/GROUP BYするという、比較的ありがちな処理を考えてみる。

もし非パーティションテーブルとパーティションテーブルのJOINが必ずAppendの後であれば、SSD-to-GPU Direct SQLを使ったとしても、CPUが処理すべき行数はほとんど減ることはない*1ため、その後のJOIN/GROUP BY処理でGPUを使えたとしても、データの流れは Disk → CPU/RAM → GPU → CPU/RAM となってしまうため、データのピンポンによって転送効率はかなり悪化してしまう。

Asymmetric Partition-wise JOINによって、非パーティションテーブルとパーティション子テーブルのJOINを先に実行できるようになれば、これはSSD-to-GPU Direct SQLがフル稼働できる状況になり、データの流れも Disk → GPU → CPU/RAM とかなりシンプルになる。

これはとりわけ、以下のような構成のマシンを用いてPCIeスイッチ経由でSSDGPU間でのダイレクトデータ転送を行う場合に顕著で、SQLワークロードの大半をPCIeスイッチより外側のNVME-SSDGPUのペアで実行する事ができるので、データが全くCPUを経由することなく処理されてしまう(= シングルノードでもPCIeバスの限界までスケールできる)という事になってしまう。

こういった諸々面白い事ができるようになるので、ぜひ皆さんにもCommitFestに登録されたパッチのレビューに参加していただければ幸いである。

*1:スキャン条件のみで大半の行を落とせるという幸運な場合を除く

技術負債を返した話(Pre-built GPU Binary対応)

最もプリミティブなPG-Stromの処理は、ユーザが入力したSQLを元にCUDA CのGPUプログラムを自動生成し、これを実行時コンパイル。ここで生成されたGPUバイナリを用いて、ストレージから読み出したデータをGPUで並列処理するという一連の流れである。
後にJOIN/GROUP BYの対応や、データ型の追加、SSD-to-GPU Direct SQLなど様々な機能を付加したものの、このコード生成と実行時ビルドの仕組みは2012年に最初のプロトタイプを公開した時から大きく変わってはいない。

コードの自動生成を担当するのはcodegen.cで、この人が吐いたCUDA Cプログラムをcuda_program.cがNVRTC (NVIDIA Run-Time Compiler) を使ってコンパイルし、GPUバイナリを生成する。

ただ、当初の『全件スキャンWHERE句・固定長データ型のみ』というシンプルなコード生成が、やがてJOIN/GROUP BYや、numeric型やtext型の対応など、より複雑な処理をGPUで実行するようになるにつれて、自動生成部分だけでなく、静的に書いておいた部分と組み合わせてGPUプログラムを生成するようになる。
これは当然で、例えばGPUでHash-Joinを実行する場合でも、クエリを処理するたびに変わる可能性があるのは、二つのテーブルを結合するJoin-Keyの結合条件を評価する部分だけで、Hash-Joinの基本的なアルゴリズムは同じだからである。

これらの静的なCUDA Cコードは、ヘッダファイル(*.h)として記述され、cuda_program.cでコードをコンパイルする時にインクルードする事で、動的生成したコードと組み合わせて使用していた。

ただし…。
PG-Stromの機能が増え、静的なCUDA Cコードの割合が増えるにつれ実行時コンパイルの時間は徐々に増えていく事になる。
intやtextといった基本的なデータ型の比較といった、シンプルなコードであれば気になる程ではないが、例えば、(コーナーケースではあるが)複合型(Composite Type)のデータをGPU側で生成する場合などは、対応する全てのデータ型が関わってくるため、ビルドに要する時間がSQLの実行時間よりも遥かに長くなるという本末転倒な状況が起こる。
その場合、一度ビルドしたGPUバイナリは共有メモリ上にキャッシュされるため、初回と2回目以降のSQL実行時間が大幅に異なるという事になってしまう。

そこで、ほぼ一週間を丸々要してGPUコードの実行時コンパイルに係る部分のリファクタリングを行った。
要は、静的なCUDA Cコードは予めコンパイルして、実行時にはバイナリをリンクすれば済む話なので、ある程度以上に複雑な構造を持つ関数は cuda_gpujoin.cucuda_numeric.cuという形で切り出し、PG-Strom自体のビルド時に、NVCCを用いてGPUバイナリファイルを生成するようにした。
これらは動的に生成されたコードと実行時リンクされ、最終的にはこれまでと同じ処理を行うようになる。
静的な部分は Fatbin 形式という形でインストールされる。この形式は、GPUの各世代(Pascal, Volta, Turing)向けに最適化されたバイナリを内部にアーカイブし、ターゲットGPUに最も適したバイナリをリンク時に使用してくれる。

なので、インストールされたPG-Strom関連ファイルを見てみると、目新しいファイルが並んでいる…という事になる。(*.gfatbin ファイルはデバッグオプションを有効にしてビルドしたバージョン。pg_strom.debug_jit_compile_options=onの時にはこちらが利用される)

[kaigai@magro ~]$ ls /usr/pgsql-11/share/pg_strom
arrow_defs.h          cuda_gpupreagg.fatbin   cuda_gpusort.h        cuda_numeric.gfatbin    cuda_textlib.gfatbin
cuda_basetype.h       cuda_gpupreagg.gfatbin  cuda_jsonlib.fatbin   cuda_numeric.h          cuda_textlib.h
cuda_common.fatbin    cuda_gpupreagg.h        cuda_jsonlib.gfatbin  cuda_plcuda.h           cuda_timelib.fatbin
cuda_common.gfatbin   cuda_gpuscan.fatbin     cuda_jsonlib.h        cuda_primitive.fatbin   cuda_timelib.gfatbin
cuda_common.h         cuda_gpuscan.gfatbin    cuda_misclib.fatbin   cuda_primitive.gfatbin  cuda_timelib.h
cuda_gpujoin.fatbin   cuda_gpuscan.h          cuda_misclib.gfatbin  cuda_primitive.h        cuda_utils.h
cuda_gpujoin.gfatbin  cuda_gpusort.fatbin     cuda_misclib.h        cuda_rangetype.h        pg_strom--2.2.sql
cuda_gpujoin.h        cuda_gpusort.gfatbin    cuda_numeric.fatbin   cuda_textlib.fatbin

ベンチマーク

GPUコードのビルドに要する時間を計測するため、何種類かのテスト用クエリを作成し、その実行時間を計測してみた。
とりあえずクエリは以下の7種類で計測してみた。

-- Q1 ... シンプルなGpuScan + Int型の演算
SELECT id, aid+bid FROM t0 WHERE aid < 100 AND bid < 100;

-- Q2 ... Numeric型を利用するGpuScan
SELECT id, a+b, a-c FROM t_numeric1 WHERE a < 50.0 AND b < 50.0;

-- Q3 ... Timestamp型とEXTRACT()文を使用するGpuScan
SELECT id, a, b FROM t_timestamp1 WHERE EXTRACT(day FROM a) = 19 and EXTRACT(month FROM b) = 6;

-- Q4 ... Arrow_Fdw外部テーブルからシンプルなデータ(Int, Timestamp)の読み出し
SELECT id,ymd FROM t_arrow WHERE id % 100 = 23;

-- Q5 ... Arrow_Fdw外部テーブルから複合型(Composite)の読み出し
SELECT id,v FROM t_arrow WHERE id % 100 = 23;

-- Q6 ... GpuJoinを含むクエリ
SELECT id, aid, atext FROM t0 NATURAL JOIN t1 WHERE bid < 50 AND cid < 50;

-- Q7 ... GpuPreAgg + GpuJoinを含むクエリ
SELECT cat, count(*), sum(ax) FROM t0 NATURAL JOIN t1 GROUP BY cat;

ディスクからの読み出しの影響を排除するため、関連する全てのテーブルをpg_prewarmでオンメモリ状態にした上で、上記の7つのクエリをそれぞれ2回ずつ実行した。
以下の表/グラフはその結果であるが、静的なCUDA Cコードを事前にビルドしておく事で、初回の実行時間が大幅に改善しているのが分かる。もちろん、最終的なGPUバイナリはほぼ同じ形になるので、ビルド済みGPUバイナリのキャッシュを参照できる2回目以降では処理時間の大きな差はない。(単位は全てms)
ちなみにQ5はめちゃめちゃ時間がかかっている。これは、Arrow形式(列データ)から複合型のデータをGPU上で再構成し、CPUへは行データとして返すという、非常にコード量の多い処理を行っているためである。

旧方式(1回目) 旧方式(2回目) 新方式(1回目) 新方式(2回目)
Q1 1,199.6 331.7 369.0 321.5
Q2 3,279.2 222.0 421.6 227.5
Q3 3,213.5 206.5 462.6 219.2
Q4 793.7 134.0 355.1 147.4
Q5 17,872.2 141.4 364.7 144.8
Q6 1,928.5 352.3 605.3 345.9
Q7 2,435.6 359.2 729.1 349.1

分かりやすいように、Pre-built GPU Binary機能の有無別に(2回目の実行時間)-(1回目の実行時間)を計算してみる。2回目は共有メモリ上のキャッシュを参照するだけなので、これが正味のビルド時間という事になるが、これまでは『一呼吸おいて』という感じだった部分がコンマ何秒程度にまで短くなった事が分かる。

背景

実はこの機能、別の要件を考えた時にどうしても急いで作らざるを得ない事情があった。
これまでユーザさんからの新機能の要求を優先して実装していたために、テストケースの作成やテスト自動化が後回しになってきた経緯があったのだが、エンタープライズ向けにはこれらソフトウェア品質改善/品質保証のためのプロセスは必要不可欠であり、今年の後半は少し腰を落ち着けてテストの充実を図ろうと考えている。
ただ、大量のテストを流すとなると、相応のバリエーションのSQLを実行する事となり、その度にGPUコードの自動生成と実行時コンパイルが走るのでは、テストケースの実行に非常に長い時間を要してしまう事になる。これではPG-Stromのテストをしているのか、コンパイラを動かしているのか分からない!

SSDtoGPU Direct SQL on Columnar-store (Apache Arrow)

I have recently worked on development of FDW for Apache Arrow files; including SSDtoGPU Direct SQL support of PG-Strom. Apache Arrow is a column-oriented data format designed for application independent data exchange, supported by not a small number of "big-data" software.
The latest revision of PG-Strom can use Apache Arrow files as data source of the query execution, through the Arrow_Fdw feature. This article introduces the brief overview of Arrow_Fdw, its benchmark results and our future plan.

What is Arrow_Fdw

PostgreSQL supports FDW (Foreign Data Wrapper) mechanism that enables to read variable external data source as if here is a normal table. Some kind of FDW also support writing.
Of course, external data source has different data structure from the internal representation of PostgreSQL rows / columns. For example, CSV file is a way to represent structured data based on comma separated text. PostgreSQL cannot understand this format without proper conversion, so file_fdw extension intermediate the world of PostgreSQL and the world of CSV file.

This mechanism can be applied variable kind of data sources. Here is postgres_fdw that uses a remote PostgreSQL server as data source. An unique one is twitter_fdw that fetches tweets from the Twitter timeline via WebAPI.
The Arrow_Fdw maps files in Apache Arrow file for references by SQL.
Below is an example to map an Arrow file (/opt/nvme/t.arrow) using Arrow_Fdw.

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

Of course, you can use CREATE FOREIGN TABLE command as usual, however, Arrow file contains its schema definition, and columns-list must match exactly. Our recommendation is IMPORT FOREIGN SCHEMA because it automatically generates foreign table definition.

Overview of SSDtoGPU Direct SQL on Arrow_Fdw


PG-Strom has a special storage optimization mode called SSD-to-GPU Direct SQL. It utilizes P2P DMA over PCIe bus, on top of GPUDirect RDMA kernel APIs provided by NVIDIA.
Usually, PostgreSQL loads table's contents onto RAM from the storage, then processes the loaded data. In typical analytics workloads, not a small portion of the loaded data is filtered out. In other words, we consumes the narrow storage bandwidth to carry junks; that is waste of I/O resources.
When PG-Strom uses SSD-to-GPU Direct SQL, it loads table's contents to GPU's device memory directly, then GPU runs SQL workloads using its thousands cores in parallel. It likely reduce amount of data to be loaded onto the host system much much smaller than the original source.

Even if data source are Arrow files, overall mechanism is same. PG-Strom loads only referenced column data using P2P DMA, then GPU processes SQL workloads (WHERE-clause/JOIN/GROUP BY). Unlike row-data of PostgreSQL tables, it does not load unreferenced columns.

Benchmark

We run the Star Schema Benchmark (SSBM) with scale-factor=401 as usual. Its database size is 353GB in total. We exported its lineorder table into an Arrow file on NVME-SSD volume.

model Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(striped by md-raid0)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

The benchmark results are below:
f:id:kaigai:20190430235505p:plain

Due to column-oriented data structure, performance is variable a lot depending on the number of referenced columns.
The query response time becomes much faster than the previous row-data based benchmark results; 7.2-7.9GB/s by PG-Strom and 2.2-2.3GB/s by PostgreSQL with parallel-scan. The query execution throughput is an inverse of query response time. So, 49GB/s means that summarizing of sequential-scan results on 300GB+ table is finished about 6sec. 25GB/s also means similar workloads are finished about 12sec. Once we can achieve this grade of batch processing performance in a single node, it changes some assumptions when we consider system configurations.

Validation of the benchmark results

For confirmation, we like to validate whether the benchmark results are reasonable. Below is definition of the flineorder foreign table that maps an Arrow file using Arrow_Fdw.
The above benchmark results say it runs Q2_1 in 28.7GB/s. The Q2_1 references lo_suppkey, lo_orderdate, lo_orderpriority and lo_supplycost.

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Referenced by Q2_1
 lo_orderdate       | integer       |  8.96GB   <-- ★Referenced by Q2_1
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Referenced by Q2_1
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Referenced by Q2_1
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

Total size of the referenced columns is 96.4GB of 310GB (31.1%). So, raw storage read throughput is 96.4GB / 11.06s = 8.7GB/s. This is reasonable performance for 3x Intel DC P4600 NVME-SSDs.

Future Plan: Towards 100GB/s

We can say the current status is "just workable", so we have to put further works to improvement software quality for production grade.

For more performance improvement, we like to have a benchmark activity using multi-GPU server like this.

This configuration uses Supermicro SYS-4029GP-TRT2 that is designed for HPC by RDMA optimization with PCIe switches. It allows to install up to 4x Tesla GPUs and 4x HBA card *1 to attach external NVME-SSD enclosure. An HBA card can connect 4x NVME-SSD, and can read the storage up to 12.8GB/s.
So, we can configure 4 of unit of 1xGPU + 4xSSD (12.8GB/s). It is 4xGPU + 16xSSD (51.2GB/s) in total.
If we can assume summarizing / analytic query read 1/3 columns in average, the upper limit of the query execution is 51.2GB/s / 0.33 = 150GB/s from the standpoint of raw storage performance.

Probably, 100GB/s is a feasible milestone for PostgreSQL, and we like to run the benchmark in 2019.

*1:One other option is 100Gb-NIC for NVME-over-Fabric. It is more scalable configuration.

Dive into Apache Arrow(その3)- SSD-to-GPU Direct SQL対応

ここ最近取り組んでいた Arrow_Fdw 機能がようやく動くようになったので、性能ベンチマークを行ってみた。
今回のエントリでは順を追って説明する事にしてみたい。

Arrow_Fdwとは

PostgreSQLにはFDW (Foreign Data Wrapper) という機能があり、PostgreSQL管理下にあるデータ(テーブルなど)だけでなく、外部のデータソースをあたかもテーブルであるかのように読み出す事ができる。一部のモジュールでは書込みにも対応している。
例えば、CSVファイルは構造化データを表現するための方法の一つである。もちろん、PostgreSQLの内部形式とデータ形式が違うので、そのままではコレをSQLの世界から取り扱う事ができないが、間にfile_fdwというモジュールが介在する事で、CSVPostgreSQLの内部形式へとデータ形式を変換している。

この枠組みは多種多様なデータソースに対して適用する事が可能で、リモートのPostgreSQLサーバをデータソースとするpostgres_fdwや、変わり種としては、Web APIを用いてTwitterのタイムラインからデータを取得するtwitter_fdw*1などがある。
これを利用してApache Arrow形式のファイルを外部テーブルにマップし、SQLから参照できるようにするのがArrow_Fdwモジュールである。

例えば、/opt/nvme/t.arrowというArrow形式ファイルをArrow_Fdwを用いてマップする場合は、以下の構文を用いる事ができる。

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

もちろん、いつも通りにCREATE FOREIGN TABLE構文を用いてもよいが、Arrow形式ファイルにはそれ自体スキーマ定義が含まれており、列リストの定義がこれと厳密に一致していないとエラーとなるため、IMPORT FOREIGN SCHEMA構文を用いてArrow形式ファイルから自動的に外部テーブルの定義を作り出す方がお勧めである。

Apache Arrowファイル形式自体の説明は、こちらのエントリをご覧いただきたい。
kaigai.hatenablog.com

要は、列指向でデータが編成されているため、集計・解析系ワークロードに対して全てのデータをストレージから読み出す必要はなく、被参照列のデータのみを読み出せばよいためにI/Oを効率的に(高い密度で)行う事が可能となるという話である。

ベンチマーク

早速ベンチマークを行ってみる。使用したのは、いつもの1Uラックサーバ でスペックおよびシステム構成は以下の通り。

chassis Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(md-raid0によるストライピング構成)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

ベンチマークに使用したワークロードは、いつもの Star Schema Benchmark で、これも今まで通り scaling factor=401 なので、DBサイズは353GB。これをExt4で初期化したNVME-SSD区画上にArrow形式ファイルとして書き出すと約310GBであった。

実行結果は以下の通り。じゃん。

列指向データなので、クエリが参照する列の数と幅によって大幅にパフォーマンスが変わってくるのは致し方ないが、2.2~2.3GB/s前後であったPostgreSQL v11や、7.2~7.9GB/s前後であったPG-Strom v2.2develの結果と比べて、大幅に性能が改善されている事が分かる。
単なる逆数ではあるが、49GB/sというのは300GB超のテーブルに対する全件スキャン+集計が6秒台で返ってくる速度、25GB/sというのは12秒台で返ってくる速度なので、これ位のデータ処理能力をシングルノードで実現できるのであれば、システム構成を検討する時の様々な前提条件が変わってくるだろう。

こちらはもう一つ別の指標で。このベンチマークを採取するにあたり、Star Schema Benchmark 全13本のクエリを3回ずつ実行し、クエリ毎の最良値を使ってスループットを計算しているが、13本 x 3回 = 39回のクエリを実行するのに要した時間を積算したもの。

PG-Strom + Arrow_Fdwのケースだと、ほんの僅か、子供のオムツ替え程度の時間であるが、元々のPostgreSQLのケースでは2時間弱と長めのお昼寝程度の時間を要している。
しかもこれは全て同一のハードウェア、特にデータを全て高速なNVME-SSDに載せているがために、元々のPostgreSQLでも2.2~2.3GB/sのパフォーマンスが出ているが、既存システムでAll-Flashという構成は多くはないと考えられる。旧来の磁気ディスクを用いたDBシステムとの比較であれば、どの程度のスコアになるだろう??

測定結果の妥当性

なお、測定ミスなどであると恥ずかしいので、性能の妥当性を考察してみた。

以下は Arrow_Fdw を用いてArrow形式ファイルをマップした flineorder テーブルの列定義であるが、28.7GB/sのクエリ実行スループットを記録した Q2_1 を例に取ると、lo_suppkeylo_orderdatelo_orderpriorityおよびlo_supplycost列が参照されている。

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Q2_1で参照
 lo_orderdate       | integer       |  8.96GB   <-- ★Q2_1で参照
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Q2_1で参照
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Q2_1で参照
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

これら被参照列の行データサイズの合計は、310GB中96.4GBである。つまり、このクエリではファイル全体の31.1%だけしかデータを読み出す必要がなく、読出しサイズ(96.4GB)をQ2_1の実行時間11.06sで割ると、96.4÷11.06 = 8.7GB/s なので、Intel DC P4600 x3 の読出し速度としては概ね妥当な値という事になる。(一安心である)

今後の展望

現状「とりあえず動きました」という段階なので、まだこの後、試験や修正などに相応の時間がかかる事はご容赦いただきたい。

更なる性能改善に向け、今後やってみたいと考えているのが、マルチGPUを搭載したサーバによるベンチマーク

上記の構成イメージは、Supermicro社のHPC向けサーバ SYS-4029GP-TRT2に、Tesla GPUを4台と、外付けNVME-SSDのエンクロージャを接続するHBAカード*2を4台搭載する。このマシンはPCIeスイッチを搭載しているので、SSDGPU間で直接データ転送を行う場合にはCPUに負荷を与えない。

ブロック図で記載すると以下のようになる。GPU 1台あたりNVME-SSD 4台が一個のユニットとなり、一台のサーバの中でそれぞれが並列に動作する。

今回のベンチマークは、PCIeスイッチが無いとか、SSDの台数が3台だという細かな違いを除けば、概ねこちらの構成で1ユニット分の性能であると言う事ができるだろう。それで15GB/s~49GB/sの性能値を出せているという事は、期待値としてその4倍、シングルノード100GB/sの処理性能というのはあながち実現不可能な絵空事という訳でもないだろう。


こちらは、今年の正月に品川神社に奉納した絵馬。
ちょっと目標値の設定が低かったかもしれぬ。

*1:今でもメンテナンスされてるんだろうか?

*2:NVMEoFを使う場合、100Gb-NICでも可。むしろ巨大データだとこちらの方がスケーラビリティは高くなる。