この連載の前回の記事で作成したコードは、以下の基本パターンに従っています。
- SPU は、該当するデータ・セットの部分をメイン・メモリーからバッファーに移すための DMA
GETをキューに入れます。 - SPU は、バッファーがフルになるまで待機します。
- SPU は、バッファーを処理します。
- SPU は、バッファーをメイン・メモリーに戻すための DMA
PUTをキューに入れます。 - SPU は、バッファーの転送が完了するまで待機します。
- データが残っている場合は、この手順がもう一度開始されます。
この手順の問題点は、かなりのプロセッサー時間を無駄にしているということです。2 つの転送ステップに必要なのは MFC (大規模な SPE の一部)だけで、SPU はまったく関与しません。これまでに作成したコードでは、SPU は MFC が完了するまでただ待機し、それから他の処理を行います。SPUがこの待機時間にできることはきっとあるはずです。
例えて言うなら、診療所に行くようなものです。診療所に着いたら、その後待合室で長い間待たされることになるのはわかっています。そのため、待っている時間を有効に使えるものを持って行くわけです。同じ原理がプログラミングにも当てはまります。データが転送されるのを待ってプロセッサー・サイクルを無駄にするのではなく、2番目のバッファーを待機させてコードが代わりにそれを処理するようにすれば、1 つのデータ・セットが転送されるのを待つ間、別のデータ・セットを処理することができます。この新しい処理アルゴリズムは、以下のようになります。
- SPU は、該当するデータ・セットの部分をメイン・メモリーからバッファー 1 に移すための DMA
GETをキューに入れます。 - SPU は、該当するデータ・セットの部分をメイン・メモリーからバッファー 2 に移すための DMA
GETをキューに入れます。 - SPU はバッファー #1 がフルになるまで待機します。
- SPU はバッファー #1 を処理します。
- SPU は、(a) バッファー #1 の内容を転送するための DMA PUT をキューに入れ、(b)
PUTの後に実行してメイン・メモリーからの次のデータ部分でバッファーを補充するための DMAGETBをキューに入れます。 - SPU はバッファー #2 がフルになるまで待機します。
- SPU はバッファー #2 を処理します。
- SPU は、(a) バッファー #2 の内容を転送するための DMA PUT をキューに入れ、(b)
PUTの後に実行してメイン・メモリーからの次のデータ部分でバッファーを補充するための DMAGETBをキューに入れます。 - すべてのデータが処理されるまで、ステップ 3 から繰り返します。
- すべてのバッファーが完了するまで待機します。
このアルゴリズムには当然、このアルゴリズムでは対処できない問題が持ち上がります。まず、それぞれのループの繰り返しで 2 つのバッファーを処理しているため、バッファーがなくなると不必要な作業を大量に行う可能性があるという点に着目してください。それに対して考えられるのは、早期の終了に対して複数のif 文を挿入しておき、データがなくなった時点でバッファーの補充プロセスを停止するという方法です。ただし、このプログラムの場合には賛成できかねます。この方法だと、繰り返しごとにかなりの追加処理が発生するからです。コードが大規模なデータ・セットを処理する場合には、セットアップや分解にかかるコストよりも、繰り返しごとに発生するコストのほうが、はるかに重要になります。そのような理由から、私はできるだけSPE 上での条件付きの処理をなくして分岐を避けるようにしています。バッファー処理の場合、MFC はゼロ・サイズのデータ要求をノーオペレーションとして処理するため、読み取るデータがないとしてもそのまま要求を発行できます。一方、実際のバッファー処理では、サイズがゼロのバッファーでも関数で完全に処理することができます。ただ単にリターンすればいいだけの話だからです。つまり、以上のすべてのケースはすでに対処されているので、追加の分解ステップを省くための分岐はデフォルトのケースを遅くするだけです。
もう 1 つの問題は、競合を起こさずにどのように PUT と GET を同じバッファーでスケジュールするかです。それぞれのデータ処理ステップの後、データをメイン・メモリーに転送するための PUT と次のデータの塊を取得するための GET を両方セットアップするわけですが、MFC は任意の順序で要求を選択して処理するようにデフォルト設定されています。そのような条件下で、特定の順序付けを行うには一体どうすればいいのでしょうか。前回の記事で説明したように、その答えはバリアとフェンスを使うことです。要求にフェンスを設定すると、同じタグ・グループ内に含まれる発行済みの MFC 要求がすべて処理されてから、現行の要求が処理されることになります。ただし、以降の転送については、順序付けの指定をしません。一方、バリアはフェンスと同様ですが、以前の要求と以降の要求を基準とした順序付けが行われるという点が異なります。つまり、2つ目の要求をフェンスあるいはバリアを使って送信すれば、MFC に正しい順序で要求を処理させることができます。これらの要求は同じタグ・グループ内に含まれるため、バッファーを使用する時期になったら、そのタグ・グループ全体が完了するのを待てばいいだけのことです。GETB、PUTB、GETF、および PUTF はいずれも単一のバッファーに対する主要なフェンスおよびバリア関連の DMA コマンドです。
それではここで、この連載で扱っている大文字変換コードにこのアルゴリズムを適用する方法について考えてみましょう。参考のため、convert_driver_c.c の元のコードを以下に記載します。
リスト 1. 単一バッファー方式の元の MFC 転送プログラム
#include <spu_intrinsics.h>
#include <spu_mfcio.h> /* constant declarations for the MFC */
typedef unsigned long long uint64;
typedef unsigned int uint32;
void convert_buffer_to_upper(char *conversion_buffer, int current_transfer_size);
#define MAX_TRANSFER_SIZE 16384
char conversion_buffer[MAX_TRANSFER_SIZE];
typedef struct {
uint32 length __attribute__((aligned(16)));
uint64 data __attribute__((aligned(16)));
} conversion_structure;
int main(uint64 spe_id, uint64 conversion_info_ea) {
conversion_structure conversion_info; /* Information about the data from the PPE */
/* New variables to keep track of where we are in the data */
uint32 remaining_data; /* How much data is left in the whole string */
uint64 current_ea_pointer; /* Where we are in system memory */
uint32 current_transfer_size; /* How big the current transfer is (may
* be smaller than MAX_TRANSFER_SIZE) */
/* We are only using one tag in this program */
mfc_write_tag_mask(1<<0);
/* Grab the conversion information */
mfc_get(&conversion_info, conversion_info_ea, sizeof(conversion_info), 0, 0, 0);
spu_mfcstat(MFC_TAG_UPDATE_ALL); /* Wait for Completion */
/* Setup the loop */
remaining_data = conversion_info.length;
current_ea_pointer = conversion_info.data;
while(remaining_data > 0) {
/* Determine how much data is left to transfer */
if(remaining_data < MAX_TRANSFER_SIZE)
current_transfer_size = remaining_data;
else
current_transfer_size = MAX_TRANSFER_SIZE;
/* Get the actual data */
mfc_getb(conversion_buffer, current_ea_pointer, current_transfer_size, 0, 0, 0);
spu_mfcstat(MFC_TAG_UPDATE_ALL);
/* Perform the conversion */
convert_buffer_to_upper(conversion_buffer, current_transfer_size);
/* Put the data back into system storage */
mfc_putb(conversion_buffer, current_ea_pointer, current_transfer_size, 0, 0, 0);
/* Advance to the next segment of data */
remaining_data -= current_transfer_size;
current_ea_pointer += current_transfer_size;
}
spu_mfcstat(MFC_TAG_UPDATE_ALL); /* Wait for Completion */
}
|
このプログラムには、連載第 5 回の convert_buffer_c.c ファイルと第 3 回の ppu_dma_main.c ファイルを追加する必要があります (この記事では、後で別のバージョンも記載します)。以前の記事と同じようにコンパイルして実行してください (これらのビルド・コマンドはこの記事のすべてのサンプルに使えます)。
spu-gcc convert_buffer_c.c convert_driver_c.c -o spe_convert embedspu -m64 convert_to_upper_handle spe_convert spe_convert_csf.o gcc -m64 spe_convert_csf.o ppu_dma_main.c -lspe -o dma_convert ./dma_convert |
このプログラムをダブルバッファー方式にするには、コードに多少のリファクタリングが必要です。まずは、バッファー固有のデータを 1 つにまとめておいてください。そのデータとそれぞれのバッファーは、以下のように関連付ける必要があります。
- バッファー自体のアドレス
- バッファーの充てんを開始した実効アドレス
- 処理中のデータのサイズ
以上のバッファー固有の全情報を保持するには、以下の構造体を作成します。
struct {
uint64 effective_address __attribute__((aligned(16)));
uint32 size __attribute__((aligned(16)));
char data[MAX_TRANSFER_SIZE] __attribute__((aligned(16)));
} buffer;
|
後は、この 2 つのバッファーのグローバル配列を宣言するだけです。
buffer buffers[2]; |
ここで、変換プロセスを以下の 2 つの関数コールに分割します。
- データ・バッファーのロード開始
- バッファー内のデータの待機、処理、そして再保存
このように分割するわけは、この 2 つの呼び出しは、再編成が必要な独立した単位だからです。データ・ロードの開始は、プログラムの開始時に呼び出されなければならないので、固有の関数に切り離さなければなりません。その結果、MFCコードのダブルバッファー・バージョンは以下のようになります (同じく convert_driver_c.c です)。
リスト 2. ダブルバッファー方式の MFC 転送
#include <spu_intrinsics.h>
#include <spu_mfcio.h>
/* Constants */
#define MAX_TRANSFER_SIZE 16384
/* Data Structures */
typedef unsigned long long uint64;
typedef unsigned int uint32;
typedef struct {
uint32 length __attribute__((aligned(16)));
uint64 data __attribute__((aligned(16)));
} conversion_structure;
typedef struct {
uint32 size __attribute__((aligned(16)));
uint64 effective_address __attribute__((aligned(16)));
char data[MAX_TRANSFER_SIZE] __attribute__((aligned(16)));
} buffer;
/* Global Variables */
buffer buffers[2];
/* Utility Functions */
inline uint32 MIN(uint32 a, uint32 b) {
return a < b ? a : b;
}
inline void wait_for_completion(uint32 mask) {
mfc_write_tag_mask(mask);
spu_mfcstat(MFC_TAG_UPDATE_ALL);
}
inline void load_conversion_info(uint64 cinfo_ea, uint64 *data_ea, uint32 *data_size) {
conversion_structure cinfo;
mfc_get(&cinfo, cinfo_ea, sizeof(cinfo), 0, 0, 0);
wait_for_completion(1<<0);
*data_size = cinfo.length;
*data_ea = cinfo.data;
}
/* Processing Functions */
inline void initiate_transfer(uint32 buf_idx, uint64 *current_ea_pointer,
uint32 *remaining_data) {
/* Setup buffer information */
buffers[buf_idx].size = MIN(*remaining_data, MAX_TRANSFER_SIZE);
buffers[buf_idx].effective_address = *current_ea_pointer;
/* Initiate transfer using the buffer index as the DMA tag */
mfc_getb(buffers[buf_idx].data, buffers[buf_idx].effective_address,
buffers[buf_idx].size, buf_idx, 0, 0);
/* Move the data pointers */
*remaining_data -= buffers[buf_idx].size;
*current_ea_pointer += buffers[buf_idx].size;
}
inline void process_and_put_back(uint32 buf_idx) {
wait_for_completion(1<<buf_idx);
/* Perform conversion */
convert_buffer_to_upper(buffers[buf_idx].data, buffers[buf_idx].size);
/* Initiate the DMA transfer back using the buffer index as the DMA tag */
mfc_putb(buffers[buf_idx].data, buffers[buf_idx].effective_address,
buffers[buf_idx].size, buf_idx, 0, 0);
}
/* Main Code */
int main(uint64 spe_id, uint64 conversion_info_ea) {
uint32 remaining_data;
uint64 current_ea_pointer;
load_conversion_info(conversion_info_ea, ¤t_ea_pointer, &remaining_data);
/* Start filling buffers to prepare for loop (loop assumes both buffers have
* data coming in) */
initiate_transfer(0, ¤t_ea_pointer, &remaining_data);
initiate_transfer(1, ¤t_ea_pointer, &remaining_data);
do {
/* Process buffer 0 */
process_and_put_back(0);
initiate_transfer(0, ¤t_ea_pointer, &remaining_data);
/* Process buffer 1 */
process_and_put_back(1);
initiate_transfer(1, ¤t_ea_pointer, &remaining_data);
} while(buffers[0].size != 0);
wait_for_completion(1<<0|1<<1);
}
|
このコードが処理するのはバッファーだけなので、大文字変換に固有の関数コール以外にコードはほとんどありません。そのため、他のコンテキストでも、ほとんどそのまま再使用できます。
前のセクションで用いた一般的な概念は、「ソフトウェア・パイプライン」と呼ばれます。つまり、一連の処理を、実行中にオーバーラップ可能な複数のステージに分割してスループットを最大限にするという方法です。この例のパイプラインに実際にあるのは、ロード/保存、そして処理という2 つのステージだけです。ただし、この概念を他の問題に一般化する場合、確立できる「パイプライン・ステージ」はどんな数にもなり得ます。基本的な考えは、パイプラインごとに固有の処理用バッファーを持たせ、一度に各バッファーを1 ステージ処理するというものです。ソフトウェア・パイプラインが使用するバッファーが 3 つ以上の場合は、マルチバッファー方式と呼ばれます。SPUでは、(上記の例のような) 2 ステージ式パイプラインが、たいていのアプリケーションに最適に働きます。その理由は、データの移動はプロセッサーではなくMFC によって処理されるため、2 つのパイプライン・ステージが並行して動作することが可能だからです。SPE プログラミングにおいては、このような処理とデータ転送の並行性が2 ステージ式パイプラインを優位にしています。
パイプライン・ステージの他にも、追加バッファーを利用できる方法はあります。その代表格は、MFC で大量のデータ転送を開始し、それから MFCに処理順序を決定させるという方法です。例えば、メモリーのある領域が現在スワップ・スペースとなっていて、別の領域がメモリー内にあるとします。この場合、MFCに未処理の転送を大量に置くことで、MFC が最適な転送順序を決定することが可能になります。この方法は、バス・コンテンションの問題を取り除くのにも役立ちます。バスが占有されている状態になると、プログラムはバスが解放されるまで待つ代わりに追加バッファーを処理し、バスが解放されてから追加バッファーを補充することができるからです。以下の特定のプログラムでは、バッファーをこの方法で処理しても実行時間には大して影響しませんが、データ・セットによっては悪影響を及ぼします。それでもなお、MFC_TAG_UPDATE_ANY の具体的な使用方法をはじめ、バッファー処理の別の手法を説明するには有益な例です。
この新しいプロセスは、以下のようになります。
- すべてのバッファーに対する DMA
GETをキューに入れます。転送しているバイトがゼロではないバッファーそれぞれに、「充てん中」のマークを付けます。各バッファーは固有の DMA タグID を取得します。 - 「充てん中」のマークが付いているバッファーがない場合は、すべての DMA
PUT操作が完了するまで待機してから終了します。 - 「充てん中」のマークが付いたいずれか 1 つのバッファーがフルになるまで待機します。
- バッファーを処理します。
- バッファーをメイン・メモリーに戻すための DMA
PUTをキューに入れます。 - 既存のデータが再び保存された後に追加データを補充するための DMA
GETBをキューに入れます。 - 前のステップでの DMA 転送が少なくとも 1 バイトを対象としている場合 (つまり、転送されていないデータが残っている場合)、バッファーに「充てん中」のマークを付けます。
- ステップ 2 に戻ります。
このアルゴリズムでは、バッファーが処理される順序はかなり不確定になります。ここでとりわけ難問となるのは、分岐の数を最小限にすることです。分岐の原因としては、バッファーを「充てん中」としてマークを付けるかどうかの決定、そして使用可能なバッファーを検出するためのバッファーのポーリングが考えられます。どちらの場合の分岐にしても、SPUイントリンシックを慎重に選択し、優れたデータ構造を設計することによって簡単に回避することができます。
バッファーが使用可能になるまで待機するのは、実はわけのないことです。興味の対象とするバッファーのマスクがあれば、spu_mfcstat(MFC_STAT_UPDATE_ANY) を呼び出すことができます。この関数は、保留中の操作がない (言い換えると、すべての操作が完了した) すべてのバッファーのマスクを返し、さらに少なくとも1 つのバッファーが利用可能になるまで待機します。これは DMA 転送用ではなく、C ライブラリーの select 関数の特殊バージョンのようなものだと考えてください。さて、この呼び出しによって利用可能なすべてのバッファーが返されることになりますが、必要なバッファーは1 つだけです。そこで、マスクを単一の索引に変換し、その索引を使用して処理対象のバッファーを指定できるようにする必要が出てきますが、分岐は使えません。これを完璧にこなすのが、SPUの命令 clz (先行するゼロの数のカウント。C 言語の組み込み関数名は spu_cntlz) です。先行するゼロをカウントし、それを 31 から減算することにより、結果のマスクを単一の索引に変換することができます。これを行うアセンブリー言語の命令シーケンスは、以下のようになります。
#assume the mask is in $10 #Count the leading zeroes clz $11, $10 #Subtract that from 31 sfi $12, $11, 31 #$12 now has the index of the buffer we want to use. |
C 言語では、以下のように書き換えられます。
/* buffers_completed holds the mask */
spu_extract(
spu_sub(
(int32)31,
spu_cntlz(
spu_promote(
(uint32)buffers_completed, 0
)
)
),
0
);
|
上記では最初に利用可能なバッファーしか取得していませんが、他に利用可能なバッファーがあれば、以降のループの繰り返しで返されます。
今度は、現在「充てん中」のどのバッファーをどのように保存するかを決定して、これらのフラグを分岐なしで設定できるようにしなければなりません。最善の方法はタグ・マスクとして保存し、spu_mfcstat のマスクとしてそのまま使用できるようにすることです。ですが、分岐を使わずにこれらのビットを条件付きで設定するとなると、そう簡単にはいきません。アセンブリー言語のバージョンは、以下のようになります。
#$10 holds our buffer mask #$11 holds the size of the last transfer #$12 holds the index of the current buffer #Convert the current buffer index to a bit for a bit mask (stored in $14) il $13, 1 shl $14, $13, $12 #Turn the bit off in the original mask xor $10, $10, $14 #is the last transfer greater than zero? (answer stored in $15) cgti $15, $11, 0 #Turn the bit on or off based the previous result (answer stored in $14) and $14, $14, $15 #Turn the bit on based on our existing results or $10, $10, $14 |
上記を適切にスケジュールすると、10 サイクルに分割することができます。ただし、これはコンパイラーが対処する類の操作なので、実際にはこのように作成するだけで、後はコンパイラーが適切に最適化してくれます。
/* clear the bit */ *buffers_with_data &= ~(1<<buf_idx); /* Set the bit conditionally */ *buffers_with_data |= (buffers[buf_idx].size > 0 ? (1<<buf_idx) : 0); |
このプログラムでは、問題は難なく並列化できるので、実際には SPU のローカル・ストアでサポート可能な限りのバッファー数を持つことができます。このプログラムでは、各バッファーで(理論上) アクティブになる DMA 転送は 2 つ (保存とロード) なので、プログラムが使用できる最大バッファー数は 8 つとなります。MFCが処理できる保留中の DMA 操作は、16 だけだからです。この制限数を超えてもプログラムの論理操作には影響しませんが、17 番目の DMA操作を追加すると、未処理の操作のいずれかが完了するまで SPU が停止します。SPU が再開した時点で、プログラムがキューに入っている次の操作を続行できるようになります。
この新しいバージョンのコードは、以下のとおりです (同じく convert_driver_c.c です)。
リスト 3. マルチバッファー方式のMFC 転送
#include <spu_intrinsics.h>
#include <spu_mfcio.h>
typedef unsigned long long uint64;
typedef unsigned int uint32;
typedef int int32;
/* Constants */
#define MAX_TRANSFER_SIZE 16384
#define NUM_BUFFERS 8 /* The MFC supports only 16 queued transfers,
* and we have up to two active per buffer */
/* Data Structures */
typedef struct {
uint32 length __attribute__((aligned(16)));
uint64 data __attribute__((aligned(16)));
} conversion_structure;
typedef struct {
uint32 size __attribute__((aligned(16)));
uint64 effective_address __attribute__((aligned(16)));
char data[MAX_TRANSFER_SIZE] __attribute__((aligned(16)));
} buffer;
buffer buffers[NUM_BUFFERS];
/* Utility functions */
inline uint32 MIN(uint32 a, uint32 b) {
return a < b ? a : b;
}
/* Processes the buffer, queues a DMA transfer to put the data back, and clears out
* the "waiting for data" bit in buffers_with_data */
inline void process_and_put_back(uint32 buf_idx, uint32 *buffers_with_data) {
convert_buffer_to_upper(buffers[buf_idx].data, buffers[buf_idx].size);
mfc_putb(buffers[buf_idx].data, buffers[buf_idx].effective_address,
buffers[buf_idx].size, buf_idx, 0, 0);
*buffers_with_data &= ~(1<<buf_idx); /* Clear out bit for this buffer */
}
/* Queues up a DMA GET transfer, ad, if there is any data to transfer, sets
* the appropriate bit in buffers_with_data to indicate that we are waiting
* for data in this buffer */
inline void initiate_transfer(uint32 buf_idx, uint32 *buffers_with_data,
uint64 *current_ea_pointer, uint32 *remaining_data) {
/* Setup buffer */
buffers[buf_idx].size = MIN(*remaining_data, MAX_TRANSFER_SIZE);
buffers[buf_idx].effective_address = *current_ea_pointer;
/* Move Data Pointers */
*remaining_data -= buffers[buf_idx].size;
*current_ea_pointer += buffers[buf_idx].size;
/* Initiate transfer (does nothing if there is no data) */
mfc_get(buffers[buf_idx].data, buffers[buf_idx].effective_address,
buffers[buf_idx].size, buf_idx, 0, 0);
/* Set the "Buffer Waiting for Data" bit only if there is data to read */
*buffers_with_data |= (buffers[buf_idx].size > 0 ? (1<<buf_idx) : 0);
}
/* Waits for all of the given buffers to complete */
inline void wait_for_completion(uint32 mask) {
mfc_write_tag_mask(mask);
spu_mfcstat(MFC_TAG_UPDATE_ALL);
}
/* Loads information about the whole conversion process */
inline void load_conversion_info(uint64 conversion_info_ea, uint64 *current_ea_pointer,
uint32 *remaining_data) {
conversion_structure conversion_info;
mfc_get(&conversion_info, conversion_info_ea, sizeof(conversion_info), 0, 0, 0);
wait_for_completion(1<<0);
*remaining_data = conversion_info.length;
*current_ea_pointer = conversion_info.data;
}
/* Returns the index of the first buffer with data available*/
inline uint32 get_next_buffer(uint32 buffers_with_data) {
uint32 buffers_completed; /* This will contain a mask of buffers whose
* transfers have completed */
/* These are the buffers to look for */
mfc_write_tag_mask(buffers_with_data);
/* Wait for at least one buffer to come available */
buffers_completed = spu_mfcstat(MFC_TAG_UPDATE_ANY);
/* Use "count leading zeros" to determine the buffer index from
* the buffers_completed mask */
return spu_extract(
spu_sub(
(int32)31,
spu_cntlz(
spu_promote((uint32)buffers_completed, 0)
)
),
0
);
}
/* Steps are numbered according to the description in this section */
int main(uint64 spe_id, uint64 conversion_info_ea) {
uint32 remaining_data;
uint64 current_ea_pointer;
uint32 buffers_with_data = 0; /* This is the bit mask for each buffer waiting on data,
* used for spu_mfcstat in the main loop */
uint32 all_buffers = 0; /* This is used to wait on all remaining transfers at
* the end of the program*/
uint32 current_buffer_idx;
load_conversion_info(conversion_info_ea, ¤t_ea_pointer, &remaining_data);
/* Step 1: Get all buffers loading (because NUM_BUFFERS is a constant, the compiler
* should unroll the loop all the way) */
for(current_buffer_idx = 0; current_buffer_idx < NUM_BUFFERS; current_buffer_idx++) {
initiate_transfer(current_buffer_idx, &buffers_with_data,
¤t_ea_pointer, &remaining_data);
all_buffers |= 1<<current_buffer_idx;
}
/* Step 2: Continue while there are still buffers pending */
while(buffers_with_data != 0) {
/* Step 3: Get the next buffer that gets filled */
current_buffer_idx = get_next_buffer(buffers_with_data);
/* Steps 4 and 5: Process the buffer and queue up a DMA transfer back to main memory */
process_and_put_back(current_buffer_idx, &buffers_with_data);
/* Steps 6 and 7: Queue up a buffer reload, and mark the buffer as "filling"
* (by setting the appropriate bit in remaining_data) */
initiate_transfer(current_buffer_idx, &buffers_with_data,
¤t_ea_pointer, &remaining_data);
}
/* Wait for all PUTs to complete */
wait_for_completion(all_buffers);
}
|
このコードでは特に、メイン・ループと convert_buffer_to_upper 関数を呼び出すところだけが必須の分岐となるようにしてあります。このコードで他に考えられる分岐は、インライン関数 (これは当然、コンパイラーによってインライン化されたものです)、あるいはコンパイラーによって簡単に除去できる分岐のいずれかです。副次作用のないコードや非インライン関数の呼び出しを使って三項演算? : にすることができる分岐は、ほとんどすべて、コンパイラー (GCC または XLC) で分岐の除去ができます。
これまで SPE プログラムのテストに使ってきた PPE プログラムは 1 つのバッファーしか使用しません。つまり、ここで説明した最適化を利用することはないので、パフォーマンスの違いを調べることは困難です。複数のバッファーを使用するプログラムが、大きなデータ・セットではどのように動作するかを調べるには、大規模なデータ・セットを使用してSPU を計時する以下のバージョンの ppu_dma_main.c を使います。
リスト 4. 大規模なデータ・セットをテストするドライバー・プログラム
#include <stdio.h>
#include <libspe.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <malloc.h>
/* Size of Buffer -- MUST be a multiple of 16 */
#define BUF_SIZE (16 * 200000)
/* embedspu actually defines this in the generated object file,
we only need an extern reference here */
extern spe_program_handle_t convert_to_upper_handle;
/* This is the parameter structure that our SPE code expects */
/* Note the alignment on all of the data that will be passed to the SPE is 16-bytes */
typedef struct {
int length __attribute__((aligned(16)));
unsigned long long data __attribute__((aligned(16)));
} conversion_structure;
int main() {
int status = 0;
int i;
struct timeval initial_time, final_time;
/* Create the string on an aligned boundary */
char *str = memalign(16, BUF_SIZE);
/* Fill the string with data */
for(i = 0; i < BUF_SIZE - 1 ; i++) {
str[i] = 'a' + i % 26;
}
/* Null-terminate string */
str[BUF_SIZE - 1] = '\0';
/* Create conversion structure on an aligned boundary */
conversion_structure conversion_info __attribute__((aligned(16)));
/* Set the data elements in the parameter structure */
conversion_info.length = BUF_SIZE; /* add one for null byte */
conversion_info.data = (unsigned long long)str;
/* Check starting time */
gettimeofday(&initial_time, NULL);
/* Create the thread and check for errors */
speid_t spe_id = spe_create_thread(0, &convert_to_upper_handle,
&conversion_info, NULL, -1, 0);
if(spe_id == 0) {
fprintf(stderr, "Unable to create SPE thread: errno=%d\n", errno);
return 1;
}
/* Wait for SPE thread completion */
spe_wait(spe_id, &status, 0);
/* Check final time */
gettimeofday(&final_time, NULL);
/* Print SPU execution time */
fprintf(stderr, "%llu microseconds\n",
((long long)final_time.tv_sec * 1000000 + final_time.tv_usec) -
((long long) initial_time.tv_sec * 1000000 + initial_time.tv_usec));
/* Print out result - uncomment if you really want to see it*/
//printf("The converted string is: %s\n", str);
return 0;
} |
この記事では、SPE でバッファーを管理する 2 つの手法、ダブルバッファー方式とマルチバッファー方式について説明しました。複数のバッファーを同時にアクティブにできるように既存のコードを拡張する方法と、MFCがバッファーを補充する順序を決定できるようにする方法を理解してもらえたはずです。これらの方法は、コードを構成する各段階で不要な分岐を導入しないように確認しながら行われます。
- 連載「Cell BE プロセッサーでのハイパフォーマンス・アプリケーションのプログラム」の他の記事を参照してください。
- MIT では PS 3 プログラミングの講習を開催しました。この講習では、IBM の Michael Perrone がとりわけ EIB (ElementInterconnect Bus、SPE とメイン・メモリーを接続するバス) が動作する仕組みとその制約事項を詳しく説明した素晴しい講演を行っています (PDF 版)。
- この講習では、IBM の Rodric Rabbah も並列プログラミングの設計パターンに関する 2 回の講演で、この記事で取り上げたソフトウェア・パイプラインのコンセプトを紹介しています(第 1 回、第 2 回 とも PDF 版です)。
-
IBM Cell/B.E. workshopの補足記事 (PDF 版) でも、SPU でのダブルバッファー方式の例を紹介しています (DMA 転送に関する豊富な情報も記載されています)。
- C/C++ 組み込み関数の一式は、PPU & SPU C/C++ Language Extension Specificationに記載されています。
-
IBM microNewsに加入して Cell BE の最新情報を入手してください。
Jonathan BartlettはLinuxアセンブリー言語を使ったプログラミングの入門書Programming from the Ground Upの著者です。New Media Worxでの主席開発者であり、顧客向けにWebアプリケーションや、ビデオ、キヨスク、デスクトップなどのアプリケーションを開発しています。連絡先はjohnnyb@eskimo.comです。