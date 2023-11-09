Apache Kafkaは、高性能でスケーラブルなイベント・ストリーミング・プラットフォームです。Kafkaの可能性を最大限に解き放つには、アプリケーションの設計を慎重に検討する必要があります。性能が低かったり、最終的に拡張性の壁にぶつかったりするKafkaアプリケーションは、あまりにも簡単に作成できます。2015年以来、IBMは、IBM® Cloud上で実行されるフルマネージドApache KafkaサービスであるIBM® Event Streamsサービスを提供しています。それ以来、このサービスは、多くの顧客とIBM社内のチームが、彼らが作成したKafkaアプリケーションの拡張性と性能の問題を解決するのに役立っています。

この記事では、Kafkaの一般的な問題のいくつかについて説明し、アプリケーションで拡張性の問題の発生を回避する方法に関する推奨事項をいくつか提供します。

1. ネットワークのラウンドトリップの待ち時間を最小限に抑える

特定のKafkaオペレーションは、クライアントがブローカーにデータを送信し、応答を待つことで機能します。ラウンドトリップには10ミリ秒かかる場合があり、高速のように聞こえますが、毎秒最大100回の操作に制限されています。このため、この種のオペレーションは可能な限り回避することをお勧めします。幸い、Kafkaクライアントには、このような往復時間の待ち時間を回避する方法が用意されています。それらを確実に活用するだけでよいのです。

スループットを最大化するためのヒント：

送信されたすべてのメッセージが成功したかどうかをチェックしないでください。KafkaのAPIを使用すると、メッセージの送信をブローカーによって正常に受信したかどうかの確認から切り離すことができます。メッセージの受信確認を待つと、アプリケーションにネットワークのラウンドトリップレイテンシーが生じる可能性があるため、可能な限りレイテンシーを最小限に抑えるようにしてください。つまり、すべてのメッセージが受信されたことを確認する前に、できるだけ多くのメッセージを送信することになります。あるいは、メッセージ配信が成功したかどうかのチェックをアプリケーション内の別の実行スレッドに委任して、より多くのメッセージを送信するのと並行して実行できるようにすることもできます。 各メッセージの処理後にオフセット・コミットを使用しないでください。オフセット（同期）のコミットは、サーバーとのネットワーク・ラウンドトリップとして実装されます。オフセットのコミット頻度を減らすか、非同期オフセット・コミット機能を使用して、処理するすべてのメッセージに対してこのラウンドトリップの料金を支払う回避をします。オフセットのコミット頻度が低いと、アプリケーションが障害を起こした場合に再処理する必要があるデータが多くなる可能性があることに注意してください。

上記を読んで、「ああ、そのことによってアプリケーションがもっと複雑になるのではないか？」と考えた方は、答えはイエスであり、おそらくそのような状況になるでしょう。スループットとアプリケーションの複雑さの間にはトレードオフがあります。ネットワークのラウンドトリップ時間で特に深刻な落とし穴となるのは、この制限に達すると、さらなるスループットの向上を実現するために大規模なアプリケーションの変更が必要になる可能性があることです。

2. 処理時間の増加を利用者障害と誤解しないようにする

Kafkaの主要な機能の1つは、消費するアプリケーションの「ライブネス」を監視し、失敗した可能性のあるアプリケーションを切断することです。これは、ブローカーが各コンシューミング・クライアントが最後に「ポーリング」（より多くのメッセージを求めることを意味するKafkaの用語）と呼んだ時期を追跡することで機能します。クライアントが十分な頻度でポーリングしない場合、そのクライアントが接続されているブローカーは、クライアントが失敗したに違いないと判断し、クライアントを切断します。これは、問題を経験していないクライアントが介入し、失敗したクライアントから作業を再開できるように設計されています。

ただし残念ながら、Kafkaブローカーは、受信したメッセージの処理に長い時間かかっているクライアントと、実際に失敗したクライアントを区別することができません。次のことをループするコンシューミング・アプリケーションを考えてみましょう。1）ポーリングを呼び出し、一連のメッセージを返します。または2）バッチ内の各メッセージを処理し、各メッセージの処理に1秒かかります。

この利用者が10件のメッセージのバッチを受信している場合、ポーリングの呼び出し間隔は約10秒になります。デフォルトでは、Kafkaはクライアントを切断するまでにポーリング間に最大300秒（5分）を許可するため、このシナリオではすべてが問題なく機能します。しかし、アプリケーションが使用しているトピックに関してメッセージのバックログが蓄積し始め、非常に忙しい日にはどうなるでしょうか。各ポーリング呼び出しから10件のメッセージを返すのではなく、アプリケーションは500個のメッセージを取得します（デフォルトでは、これがポーリングの呼び出しによって返されるレコードの最大数です）。これで、Kafkaがアプリケーション・インスタンスが失敗したと判断し、切断するのに十分な処理時間がかかります。これは悪いニュースです。

それがさらに悪化する可能性があることを知ってうれしく思います。一種のフィードバック・ループが発生する可能性があります。十分な頻度でポーリングを実行しないことからKafkaがクライアントの切断を開始すると、メッセージを処理するアプリケーションのインスタンスが減少します。トピックについての大量のメッセージが未処理のまま存在する可能性が高まり、より多くのクライアントが大規模なメッセージを取得し、その処理に時間がかかりすぎる可能性が高まります。最終的には、消費側アプリケーションのすべてのインスタンスが再起動ループに含まれ、有用な作業は行われません。

こうした事態を回避するためにできることとは。

ポーリング呼び出し間の最大時間は、Kafka利用者の「max.poll.interval.ms」を使用して構成します。1つのポーリングで返されるメッセージの最大数も、「max.poll.records」オプションを使用して設定できます。構成します。経験則として、優先順位の「max.poll.records」を減らして「max.poll.interval.ms」を増やすことを目指してください。最大ポーリング間隔を大きく設定すると、Kafkaが実際に失敗した利用者の特定に時間がかかるためです。 Kafkaの利用者には、メッセージの流れを一時停止して再開するように指示することもできます。消費を一時停止すると、ポーリング・メソッドはメッセージを返しなくなりますが、クライアントが失敗したかどうかを判断するために使用されるタイムリーはリセットされます。一時停止と再開は、次の両方を行う場合に便利な施策です。a）個々のメッセージの処理に時間がかかる可能性があることが予想されること。b）Kafkaは、個々のメッセージの処理の過程でクライアントの障害を検知できるようにしたいと考えています。 Kafkaクライアント・メトリックの有用性をお見逃しなく。メトリクスのトピックはそれ自体で記事全体を埋めることができますが、この文脈では、利用者はポーリングの平均時間と最大時間の両方のメトリクスを公開します。これらのメトリクスを監視することで、Kafkaから受信した各メッセージの処理に予想以上の時間がかかっている理由がダウンストリーム・システムにある状況を特定することができます。

この記事の後半では、利用者障害のトピックに戻り、利用者グループのリバランシングを引き起こす方法と、これがもたらす破壊的な影響について説明します。

3. アイドル状態の利用者のコストを最小化する

内部では、Kafka利用者がメッセージを受信するために使用するプロトコルは、Kafkaブローカーに「取得」リクエストを送信することで機能します。このリクエストの一環として、クライアントは、ブローカーが空の応答を送信するまでの時間など、差し迫ったメッセージがない場合にブローカーがすべきことを示します。デフォルトでは、Kafkaの利用者はブローカーに最大500ミリ秒（「fetch.max.wait.ms」によって制御）、少なくとも1バイトのメッセージ・データが使用可能になるようにする（「fetch.min.bytes」で制御する構成）です。

500ミリ秒を待つのは不合理なことではありませんが、アプリケーションにほとんどアイドル状態の利用者があり、インスタンスが5,000件まで拡張された場合、1秒あたり2,500件のリクエストがあり、まったく何も行わないことになります。これらのリクエストはそれぞれ、ブローカーの処理にCPU時間を要し、極端な場合には、有益な作業を実行したいKafkaクライアントの性能と安定性に影響を与える可能性があります。

通常、Kafkaのスケーリング・アプローチは、ブローカーをさらに追加し、新旧の両方のすべてのブローカーにわたってトピック・パーティションのバランスを均等に再調整することです。残念ながら、クライアントがKafkaに不必要なフェッチ・リクエストを大量に送信している場合、このアプローチは役に立たない可能性があります。各クライアントは、クライアントがメッセージを利用しているトピック・パーティションをリードするすべてのブローカーにフェッチ・リクエストを送信します。そのため、Kafkaクラスターを拡張し、パーティションを再分散した後でも、クライアントのほとんどがブローカーのほとんどに取得リクエストを送信する可能性があります。

そこで、できることは何でしょうか。

Kafka利用者構成を変更すると、この影響を軽減できます。到着すぐにメッセージを受信したい場合は、「fetch.min.bytes」をデフォルトの1のままにする必要があります。一方、「fetch.max.wait.ms」には設定の値を大きくすることができます。そうすることで、アイドル状態の利用者によるリクエストの数が減ります。 さらに広いスコープでは、アプリケーションには潜在的に数千のインスタンスが必要で、各インスタンスがKafkaから非常に低頻度で消費する必要がありますか。それにはそれなりの理由があるかもしれませんが、Kafkaをより効率的に使用するように設計できる方法があるかもしれません。次のセクションでは、これらの考慮事項のいくつかに触れます。

4. トピックとパーティションの適切な数を選択する

他のパブリッシュ・サブスクライブ・システム（Message Queuing Telemetry Transport、略してMQTT）を使用している背景からKafkaにアクセスする場合、Kafkaトピックは非常に軽量で、ほとんど一時的なものであると予想されるかもしれません。そうではありません。Kafkaは、数千ものトピックにはるかに満足しています。Kafkaのトピックも比較的長く続くと予想されています。単一の返信メッセージを受信するようにトピックを作成し、そのトピックを削除するなどの慣行は、Kafkaでは一般的ではなく、Kafkaの強みを発揮しません。

代わりに、長く続くトピックを計画します。おそらく、アプリケーションやアクティビティーのライフサイクルを共有します。また、トピックの数を数百、場合によっては数千に制限することも目指します。これには、特定のトピックにどのようなメッセージが織り込まれているかについて、異なる視点が必要になる場合があります。

これに関連して、「トピックのパーティション数はいくつ必要か。」という質問がよく寄せられます。従来、トピックの作成後にパーティションを追加しても、そのトピックに保持されている既存のデータのパーティショニングは変更されないため（したがって、パーティション内でメッセージの順序を提供するためにパーティショニングに依存している利用者に影響を与える可能性があるため）、過大評価することがアドバイスです。これは良いアドバイスです。ただし、追加でいくつかの考慮事項を提案します。

MB/秒単位のスループットが期待されるトピックや、アプリケーションをスケールアップするにつれてスループットが増加する可能性があるトピックの場合は、負荷が複数のブローカーに分散できるように、複数のパーティションを使用することを強くお勧めします。Event Streamsサービスは、常に複数の3ブローカーでKafkaを実行します。本記事の執筆時点では、ブローカーは最大9件までですが、将来的にはこの数は増加する予定です。トピックのパーティション数として3の倍数を選択すると、すべてのブローカーで均等にバランスをとることができます。 トピックのパーティション数は、Kafka利用者がトピックからの消費メッセージをKafka利用者グループと有用に共有できる数の制限です（詳細については、後述します）。トピックにパーティションよりも多くの利用者を利用者グループに追加すると、一部の利用者はメッセージ・データを消費せずにアイドル状態になります。 単一パーティションのトピックを使用することについても、単一パーティションのトピックが大量のメッセージ・トラフィックを受信しないと確信している場合や、トピック内の順序に依存せず、後でパーティションを追加しても問題ない場合があれば、それ自体は問題ありません。

5. 利用者グループのリバランシングは、驚くべき破壊的変化をもたらす可能性がある

メッセージを消費するほとんどのKafkaアプリケーションは、Kafkaの利用者・グループ機能を利用して、どのクライアントがどのトピック・パーティションから消費するかを調整します。利用者グループの想起が少し混乱している場合に、重要なポイントについて簡単にご確認ください。

利用者・グループはKafkaクライアントのグループを調整し、いつでも1つのクライアントだけが特定のトピック・パーティションからメッセージを受信するようにします。これは、アプリケーションの多数のインスタンス間でトピックに関するメッセージを共有する必要がある場合に便利です。

Kafkaクライアントが利用者グループに参加したり、以前に参加した利用者グループから離脱したりすると、利用者グループのバランスが取り直されます。通常、クライアントは、自身が所属するアプリケーションが開始されると利用者グループに参加し、アプリケーションがシャットダウン、再起動、またはクラッシュしたときに終了します。

グループのバランスが再調整されると、トピック・パーティションはグループのメンバー間で再配分されます。そのため、あるクライアントがグループに参加する場合、既にグループに含まれているクライアントの一部は、トピックのパーティションをトピックから切り離して（Kafkaの用語で「取り消す」）、新しく参加するクライアントに提供する可能性があります。逆も真であり、クライアントがグループを離れると、そのグループに割り当てられたトピック・パーティションは残りのメンバーの間で再配布されます。

Kafkaが成熟するにつれて、ますます洗練されたリバランシング・アルゴリズムが考案されています（そして今後も考案され続けます）。Kafkaの初期バージョンでは、利用者グループがバランスを再調整すると、グループ内のすべてのクライアントが消費を停止する必要があり、トピックのパーティションはグループの新しいメンバー間で再配布され、すべてのクライアントが再び消費を開始していました。このアプローチには2つの欠点があります（これらは以前から改善されているため、ご心配いりません）。

再バランスが行われる間、グループ内のすべてのクライアントがメッセージの消費を停止します。これはスループットに明らかな影響を与えます。 Kafkaクライアントは通常、アプリケーションにまだ配信されていないメッセージのバッファーを保持し、バッファーが使い込まれる前にブローカーからさらに多くのメッセージを取得しようとします。その目的は、Kafka ブローカーからより多くのメッセージがフェッチされている間、アプリケーションへのメッセージ配信が停止するのを防ぐことです（この記事の前半で説明したように、Kafka クライアントもネットワークの往復を待たないようにしています）。残念ながら、リバランスによってパーティションがクライアントから取り消される場合、そのパーティションのバッファリングされているデータはすべて破棄されます。同様に、リバランシングによって新しいパーティションがクライアントに割り当てられると、クライアントはパーティションの最後にコミットされたオフセットから開始してデータのバッファーを開始し、ブローカーからクライアントへのネットワーク・スループットが急増する可能性があります。これは、パーティションが新たに割り当てられたクライアントによって、以前にパーティションが取り消されたクライアントによってバッファリングされていたメッセージ・データを再読み取りしていることが原因です。

最近のリバランス・アルゴリズムは、Kafkaの用語で言えば「定着性」と「連携」を追加することで大幅な改善を加えています。

「一貫性のある」アルゴリズムは、再バランス後に、できるだけ多くのグループ・メンバーが再バランス前に行った同じパーティションを維持するようにしようとします。これにより、リバランスが行われたときにKafkaから破棄または再読み取りされる、バッファー内のメッセージ・データの量が最小限に抑えられます。

「協力的」アルゴリズムにより、クライアントはリバランスが発生する間、メッセージを利用し続けることができます。クライアントがリバランスの前にパーティションを割り当て、リバランスが発生した後にそのパーティションを保持すると、リバランスまでに中断のないパーティションから消費し続けることができます。これは、パーティションを同じクライアントに割り当て続けるように機能する「定着性」と相乗効果を発揮します。

最近のリバランシング・アルゴリズムのこのような機能強化にもかかわらず、アプリケーションが利用者グループのリバランスの対象となることが頻繁にある場合、全体的なメッセージング・スループットに影響が生じ、クライアントがバッファリングされたメッセージ・データを破棄して再取得するため、ネットワーク帯域幅が無駄になります。実行可能なことについていくつかの提案があります。