 | レベル: 中級 Jonathan Bartlett (johnnyb@eskimo.com), Director of Technology, New Medio
2007年 10月 16日 連載第 1 回では、Cell Broadband Engine プロセスへの基本的な移植方法を説明しました。この 2 回目の記事ではさらに詳細に踏み込んで、DMA 転送サイズに基づく制約をなくす方法、プログラムを複数の SPE で実行できるように分割する方法、そしてプログラムの処理速度を一層向上させる方法を説明します。
連載第 1 回目では、計算が集中する関数を SPE に任せることによって Cell/B.E. プロセッサー用アプリケーションを強化する方法を説明しました。その基本的方法として、非侵入型フレームワークを開発し、他のプラットフォームやビルド・システムに悪影響を与えることなく SPE 固有のコードをマルチプラットフォーム・プロジェクトに追加できるようにしましたが、それによって完成したシステムは以下のいくつかの点で最適だとは言えません。
- 4 の倍数個の値リストしか許容されないこと。
- 値の許容数が最大 4,096 個であること。
- SPE ではベクトル化を行わないこと。
- 使用するのが 1 つの SPE のみであること。
そこで、今回の記事では 1 回目の記事で開発したコードのパフォーマンスを改善するとともに一層堅牢なものにします。
基本マクロを使用する
本題に入る前に、いくつかの基本マクロを定義しておきます。これらのマクロは、アライメント、比較、間接参照やその他の操作を行うコードで頻繁に使うことになります。以下に記載するマクロには、最初の記事で紹介したものも含まれます。
リスト 1. speport.h の追加マクロ
/* Alignment macros */
#define SPE_ALIGNMENT 16
#define SPE_ALIGNMENT_FULL 128
#define SPE_ALIGN __attribute__((aligned(16)))
#define SPE_ALIGN_FULL __attribute__((aligned(128)))
/* Rounding macros */
/* Go _up_ to the next address based on an alignment */
#define ROUND_UP_ALIGN(value, alignment) (((value) + \
((alignment) - 1))&(~((alignment)-1)))
/* Go _down_ to the next address based on an alignment */
#define ROUND_DOWN_ALIGN(value, alignment) ((value)&(~((alignment) - 1)))
/* Divide and round up */
#define DIVIDE_ROUND_UP(value, divisor) (((value) + (divisor) - 1) / (divisor))
/* In case these aren't already defined */
#ifndef MIN
#define MIN(X,Y) (((X) < (Y)) ? (X) : (Y))
#endif
#ifndef MAX
#define MAX(X,Y) (((X) > (Y)) ? (X) : (Y))
#endif
/* Simplify Dereferencing on the SPE (to deref other types, change the typecase at the */
/* beginning) */
#define SPE_DEREF_UINT32(base, offset) *((unsigned int *)(((char *)(base)) + (offset)))
/* Give a 16-byte structure for "status" so that it can be used in an array without */
/* causing alignment errors */
typedef struct {
int status SPE_ALIGN;
} spe_remote_function_status_t;
|
これらの関数の重要性は、サンプルを見ていくうちに納得できるはずです。
制約を排除する
第一に考えられる明らかなコードの改善方法は、制約を排除することです。現時点でのコードは SPE DMA 転送のアライメント上の制約により、4 個の値 (4 * 4 バイト浮動小数値 (float) = 16 バイト) の倍数サイズの値リストに制約されます。また、SPE DMA 転送のサイズ上の制約によって、プログラムには最大 4,096 の値という制約も課せられています。いずれにしても、望ましくない制約です。
最も簡単な修正はサイズの制約です。サイズの制約は、リストを一部ずつ処理する while ループに SPE コードを組み込むだけで修正できます。このループは以下のようなものです (spe_std_dev.spuc で spe_calculate_standard_deviation 関数の最初のほうに追加します)。
リスト 2. プログラムへのループの追加
float spe_calculate_standard_deviation(int num_values, float *values_ea) {
/* Previous Data Definitions Go Here */
/* ..... */
/* Calculate the number of iterations we are going to need */
int num_values_remaining = num_values;
float *current_values_ea = values_ea;
while(num_values_remaining > 0) {
int values_this_cycle = MIN(num_values_remaining, MAX_VALUES);
/* Previous code goes here */
/* Replace values_ea with current_values_ea */
/* Replace num_values with values_this_cycle */
/* ..... */
num_values_remaining -= values_this_cycle;
current_values_ea += values_this_cycle;
}
/* Rest of function from original */
/* Finalize Computation */
avg = sum / (float)num_values;
variance = (sum_squares - (sum * avg)) / (float)num_values;
std_dev = sqrt(variance);
return std_dev;
}
|
上記のループを追加したことで、処理可能な値の個数に対する最大サイズの制限は除去されました。したがって、main 内の値の処理数をチェックするための条件も除去する必要があります (条件はコメント /* Check boundaries */ で始まります。else 分岐は残してください)。
しかしながら、値の個数がまだ問題として残ります。値の個数は依然として 4 の倍数でなければなりません。この問題を回避するには、最大 3 個の値の DMA を個別に行う必要があります。なぜなら、DMA システムは 16 バイトの倍数を転送することも、自然にアラインされた値を転送することも可能だからです。これはつまり、値が 4 バイトにアラインされてさえいれば、単一の 4 バイトの値を転送できるということを意味します。アライメントを保証するのは簡単です。malloc はすでに、すべての値を 16 バイト境界に割り当てるものに置き換えています。
転送サイズに関しては、2 つの方法で転送をプログラムに統合することができます。1 つ目の方法は、ループがリストのアラインされていない部分を検出した場合には、一度に 1 個だけ値を処理するようにループ自体を変更する方法です。もう 1 つの方法では、特殊なケースを処理する別のループを追加します。この場合、どちらのループもアライメントの問題を扱うことができなければなりません (この点は、リストを分割する際にさらに重要になります)。
これを実現するには、例外ケースで一度に 1 個だけ値を処理するようにループを以下のように書き直します。
リスト 3. DMA 用にアラインされていない値を処理するループへの変更
while(num_values_remaining > 0) {
int values_this_cycle = MIN(num_values_remaining, MAX_VALUES);
/* Check to see if we the leading values are unaligned. If so, go one */
/* value at a time until they are aligned. */
if(((int)current_values_ea % DMA_TRANSFER_UNIT) != 0) {
values_this_cycle = 1;
} else {
/* Check to see if this is a size that is transferable in one block */
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error: The previous line is longer than the max of 90 characters ---------|
if((values_this_cycle % DMA_TRANSFER_UNIT_FLOAT) != 0) {
/* Check to see if we can transfer a larger sub-block in a */
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error: The previous line is longer than the max of 90 characters ---------|
/* single transfer */
int tmp_num_values = values_this_cycle - (values_this_cycle %
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error: The previous line is longer than the max of 90 characters ---------|
DMA_TRANSFER_UNIT_FLOAT);
if(tmp_num_values > 0) {
/* If so, use this value */
values_this_cycle = tmp_num_values;
} else {
/* Otherwise, just go one value at a time */
values_this_cycle = 1;
}
}
}
/* Load values in from main memory pointer */
/* If we are doing one at a time, then we need to check for */
/* alignment issues */
if(values_this_cycle == 1) {
/* Unaligned addresses require similarly unaligned destinations - */
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error: The previous line is longer than the max of 90 characters ---------|
/* calculate the offset */
int natural_offset = (((int)current_values_ea / sizeof(float)) %
DMA_TRANSFER_UNIT_FLOAT);
/* Perform the Transfer */
spu_mfcdma32(&ls_values[natural_offset],
(unsigned int)current_values_ea, values_this_cycle * sizeof(float),
DEFAULT_DMA_TAG, MFC_GET_CMD);
spu_mfcstat(MFC_TAG_UPDATE_ALL);
/* Move it back into position for processing */
ls_values[0] = ls_values[natural_offset];
} else {
/* Regular properly-aligned, properly-sized transfer */
spu_mfcdma32(ls_values, (unsigned int)current_values_ea,
values_this_cycle * sizeof(float), DEFAULT_DMA_TAG, MFC_GET_CMD);
spu_mfcstat(MFC_TAG_UPDATE_ALL);
}
for(i = 0; i < values_this_cycle; i++) {
sum += ls_values[i];
sum_squares += ls_values[i]*ls_values[i];
}
num_values_remaining -= values_this_cycle;
current_values_ea += values_this_cycle;
}
|
上記のように変更したら、以下の定義を speport.h に追加してください。
#define DMA_TRANSFER_UNIT 16
#define DMA_TRANSFER_UNIT_FLOAT (DMA_TRANSFER_UNIT / sizeof(float))
|
以上のように、メモリー内のブロックの位置およびサイズに応じて DMA 転送を減速、高速化するという内容になっています。自然なアライメントはそれでも必要ですが、このように変更すると値が DMA 用に適切にアラインされないとしても大きな問題にはなりません。ただし、Cell/B.E. プロセッサーでは下位 4 桁のビットがソース・レジスターと宛先レジスターとで一致していなければならないため、単一バイトの転送にはいくつかの技が取り入られています。DMA 転送は、大量の転送に十分対応可能なようにアラインされている場合には高速化し、そうでない場合には減速します。データは 4 バイトでアラインされる必要がありますが、この要件は難なく満たせるはずです。また、関数はデータが 16 バイト境界にアラインされている場合には処理速度が増し、128 バイト境界にアラインされている場合にはさらに高速化します。
複数の SPE を使用する
前のセクションでは関数の操作上の制約を取り除きました。このセクションでは、タスクを複数の SPE に分割して Cell/B.E. プロセッサーの残りの計算能力を活用します。すべてのタスクを簡単に複数の SPE に分割できるわけではありませんが、ここでは極めて容易に分割を行います。
基本的に、プログラムには総和処理 (合計と二乗の合計を出すこと)、終結処理 (合計から最終的な答えを算出すること) という 2 つの部分があります。終結処理は明白なので、PPE でも簡単にできます。一方の総和処理は、加算には可換性があるためどのようにでも分けられます。純粋な形では可換のコンポーネントを持たないアルゴリズムは、ほとんどの場合、その構成部分が可換になるように再編成することができます。
したがって、これからプログラムの SPE の部分から終結処理を除去し、合計と二乗の合計だけを加算させるようにします。プロセスの作成、処理の分割、結果の収集、そして解の終結処理を行うのは PPE の役目とします。それには、SPE アプリケーションを変更するとともに (今度は最終結果ではなく、合計を返すようにします)、ドライバー・プログラムを変更する必要があります (複数のインスタンスを作成し、応答を収集して処理するようにします)。
最初に必要な作業は、my_math_spe.h で PPE と SPE の間で受け渡しされる struct を再定義することです。
リスト 4. 部分的結果を渡す struct
#ifndef MY_MATH_SPE_H
#define MY_MATH_SPE_H
#include "speport.h"
typedef struct {
int num_values SPE_ALIGN;
float *values SPE_ALIGN;
float result_sum SPE_ALIGN;
float result_sum_squares SPE_ALIGN;
} spe_std_dev_params_t;
#endif
|
今度は SPE に対し、処理関数の動作に簡単な変更をいくつか加えます。最初に、spe_std_dev.spuc の処理関数が 1 つではなく 2 つの浮動小数値を返すように再定義します (最終計算を実行しないので、その名前も変更します)。新しいプロトタイプは以下のようになります。
void spe_calculate_sums(int num_values, float *values_ea, float *result_sum,
float *result_sum_squares);
|
同様に、関数定義を以下のように変更します。
void spe_calculate_sums(int num_values, float *values_ea, float *result_sum, float
*result_sum_squares) {
|
複数の値を返すようにしているため、戻り値を void に変更し、変数には最後にアドレスを渡すだけにします。この変更は、ファイルの先頭にあるプロトタイプ宣言と関数定義自体の両方で必要です。このように変更した後は、関数を呼び出すときに、以下のように struct で 2 つの値へのポインターを渡す必要があります。
/* Perform Task */
spe_calculate_sums(params.num_values, params.values, ¶ms.result_sum,
¶ms.result_sum_squares);
|
spe_calculate_sums の最後では、計算を終結する代わりにこれらの戻り値を設定する必要があります (終結処理は PPE が行います)。
*result_sum = sum;
*result_sum_squares = sum_squares;
return;
|
avg、variance、および std_dev 変数はもう必要ないので、これらの宣言は削除して構いません。
ここからは厄介な部分です。ここでは、PPE にジョブを適切に分割させるようにします。
以下の手順で行うことは基本的に、SPE コンテキストの配列とそれに対応するパラメーター構造体の配列を取得することです。
- 最初の実行で、すべてのコンテキストを初期化します。
- 毎回実行するたびに、問題を作業ユニットに分割した上で、処理対象の独自の値セットを使って SPE を一斉に動作させます。
- それぞれの SPE が処理を完了するまで待ちます。
- SPE の処理が完了したら、すべての値が総計されるまで個々の合計を加算します。
この時点で、最終的な計算を実行して応答を返すための必要な情報が揃います。
以下に、PPE での新しい calculate_standard_deviation 実装を記載します (my_math.c)。
リスト 5. 複数の SPE に適応させた PPE 関数
/* ... header files ... */
#ifndef USE_SPE
/* Non-SPE Version */
/* ... */
#else
/* SPE-specific includes */
#include "speport.h"
#include "my_math_spe.h"
#define NUM_SPE_THREADS 4
float calculate_standard_deviation(int num_values, float *values) {
/* Keep SPE Thread Contexts Here */
static spe_remote_function_ptr_t std_dev_func[NUM_SPE_THREADS] =
{NULL, NULL, NULL, NULL};
/* Parameters to pass to contexts */
spe_std_dev_params_t params[NUM_SPE_THREADS] SPE_ALIGN;
spe_remote_function_status_t status[NUM_SPE_THREADS] SPE_ALIGN;
/* Working variables */
int thread_idx;
int workunit_size = ROUND_UP_ALIGN(DIVIDE_ROUND_UP(num_values, NUM_SPE_THREADS),
(SPE_ALIGNMENT_FULL/sizeof(float)));
int current_workunit_size;
int remaining_values = num_values;
float *current_value_position = values;
float sum = 0.0, sum_squares = 0.0, avg, variance, std_dev;
/* Start up SPE processes if this is our first run */
if(std_dev_func[0] == NULL) {
for(thread_idx = 0; thread_idx < NUM_SPE_THREADS; thread_idx++) {
std_dev_func[thread_idx] = spe_remote_function_start("./spe_std_dev",
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error: The previous line is longer than the max of 90 characters ---------|
NULL);
if(std_dev_func[thread_idx] == NULL) {
fprintf(stderr, "Error starting thread!\n");
exit(1);
}
}
}
/* Split up the Data and Call the Function */
for(thread_idx = 0; thread_idx < NUM_SPE_THREADS; thread_idx++) {
current_workunit_size = MIN(workunit_size, remaining_values);
/* Make parameters */
params[thread_idx].num_values = current_workunit_size;
params[thread_idx].values = current_value_position;
/* Call the function */
if(spe_remote_call(std_dev_func[thread_idx], ¶ms[thread_idx],
SPE_RUN_NONBLOCK, &status[thread_idx]) < 0) {
fprintf(stderr, "Error running function\n");
exit(1);
}
/* Move counter and pointer for next function call */
remaining_values -= current_workunit_size;
current_value_position += current_workunit_size;
}
/* Gather the results */
for(thread_idx = 0; thread_idx < NUM_SPE_THREADS; thread_idx++) {
/* Wait for thread to complete */
spe_wait_completion(&status[thread_idx], 0);
/* Gather data from parameters */
sum += params[thread_idx].result_sum;
sum_squares += params[thread_idx].result_sum_squares;
}
/* Finalize calculation */
avg = sum / (float)num_values;
variance = (sum_squares - (sum * avg)) / (float)num_values;
std_dev = sqrt(variance);
/* Return result */
return std_dev;
}
#endif
|
ご覧のように多少骨の折れる作業ですが、これで関数の処理はかなり高速になります。大規模なデータ・セットでは、このバージョンでの関数のパフォーマンスは PPE のみの場合の 4.25 倍に改善されます。さらに PPE で別の処理を同時に行わなければならないとしても、このバージョンでは PPE がまったく必要にならないため同時に処理することができます。
関数の最初にある workunit サイズの計算について疑問に思われるかもしれません。これは基本的に、作業ユニットを複数の利用可能な SPE に分割すると同時に、各作業ユニットを 128 の倍数個に維持して DMA での動作を最適化することを目的とした計算です。ROUND_UP_ALIGN を使用しない場合、このコードは値の個数が 16 の倍数ではないデータ・セットを処理するときに速度が半分まで落ちてしまいます。データ・セットの値の個数が 128 の倍数ではない場合は、多少の速度低下で済みます。
その他の最適化を行う
処理速度をさらに向上させるためには、以下の最適化も可能です。
- SPE 関数のダブルバッファー処理
- コードの SPE 部分での SPE ベクトル浮動小数点組み込み関数の使用
- コア・ループとスケジュール命令のアンロール
最初の最適化の方法では、SPE 関数をダブルバッファーで処理します。現時点では、DMA の完了を待機することで多大な時間を無駄にしていますが、ダブルバッファー方式を使えばデータのロードと処理を同時に行うことができます。ダブルバッファー方式についての詳細は、「Cell BE プロセッサーでのハイパフォーマンス・アプリケーションのプログラミング、第 6 回: DMA 転送での賢いバッファー管理」を参照してください。
2 番目の最適化の方法では、コードの SPE 部分で SPE ベクトル浮動小数点組み込み関数を使います。コンパイラーが自動的に並行処理可能なコードを検出し、適切なベクトル演算を作成することがありますが、この場合、繰り返し処理のたびに sum および sum_squares 変数が変更されるため、コンパイラーは演算同士が比較的独立していることを認識せずに、演算を逐次化してしまう可能性があります。SIMD 演算を明確にコード化すれば、パフォーマンスは劇的に向上します。SPE のベクトル組み込み関数についての詳細は、「Cell BE プロセッサーでのハイパフォーマンス・アプリケーションのプログラミング、第 5 回: C/C++ での SPU のプログラミング」を参照してください。
3 番目の最適化の方法は、コア・ループとスケジュール命令をアンロールすることです。ループをアンロールすることで、まだ完了していない命令に依存する演算をすべて独立させ、依存関係の計算が終わるまで待機する間に別の命令を実行できるように編成することができます。この手順は、記事「Programming high-performance applications on the Cell BE processor, Part 4: Program the SPU for performance」で説明しています。
まとめ
この記事で説明した手順を早速あなたのプロジェクトに適用してください。そうすれば、ビルド・システムやアプリケーションのその他の関数インターフェースに悪影響を与えることなく、アプリケーションを並列化して Cell B./E. プロセッサーでのパフォーマンスを最適化できるようになります。この記事では、DMA 転送サイズなどの基本的な SPE 制約のない SPE アプリケーションをプログラミングする方法、そしてデータ・セットを複数の SPE に分割して Cell B./E. の潜在能力を最大限に生かす方法について説明しました。データを分割する際の要件はアプリケーションによって多少異なりますが、この記事がその一般的な方法を理解する上で役立てたようであれば幸いです。
参考文献 学ぶために
製品や技術を入手するために
議論するために
著者について
記事の評価
|  |