Liberty でのマイクロサービス回復力の向上

MicroProfile のフォールト・トレランス・フィーチャーを使用すると、サービス呼び出しの回復力を向上させることができます。 このフィーチャーは、Eclipse Microprofile Fault Tolerance 仕様 1.0、1.1、および 2.0 の実装です。 このフィーチャーにより、再試行、回路ブレーカー、バルクヘッド、タイムアウト、およびフォールバックを含むパターンを使用して、回復力のあるマイクロサービスをサポートするプログラミング・モデルが提供されます。

Open Liberty MicroProfile Fault Tolerance に関する最新情報は、 Open Liberty Web サイトから入手できます。

始めに

MicroProfile Fault Tolerance フィーチャーによって実装されたオープン・ソースの MicroProfile 仕様について詳しくは、 Eclipse Microprofile Fault Tolerance Specification 2.0を参照してください。

手順

  1. server.xml ファイル内の featureManager エレメントに mpFaultTolerance-1.0mpFaultTolerance-1.1、または mpFaultTolerance-2.0 フィーチャーを追加します。
    <featureManager>
       <feature>mpFaultTolerance-2.0</feature>
    </featureManager>
    
  2. コード・スニペットを使用して、マイクロサービスの回復力を向上させます。
    フォールト・トレランス回路ブレーカーは、システムがフェイル・ファストを実行する方法を提供します。 サービスの実行を一時的に使用不可にすることで、そのサービスがシステムに過負荷をかけるのを防ぎます。
    フォールト・トレランス RetryPolicy は、どの時点でサービスを再試行するかを構成する方法を提供します。 メイン・サービスに障害が起こった場合、使用するサービスをフォールト・トレランス・フォールバックで指定できます。
    フォールト・トレランスは、メソッドに @Asynchronous のアノテーションを付けて、メソッドが Future または (mpFaultTolerance-2.0 を使用している場合は) CompletionStage を返すようにすることによって、メソッドを非同期に実行する機能を提供します。 この機能により、他のフォールト・トレランス・アノテーションをより強力にすることができます。例えば、待機中のメソッド呼び出しを Bulkhead がキューに入れることができるようにしたり、メソッドが割り込まれていることに対して応答しない場合でも、制限時間に達した際に @Timeout ができるだけ早く結果を返せるようにしたりできます。
    フォールト・トレランス・バルクヘッドは、サービスに対する同時呼び出しの数を制限します。 また、このバルクヘッドは、サービス呼び出しが使用できるシステム・リソースの量を制限します。
  3. アノテーション・パラメーターを構成します
    • MicroProfile Config を使用して、フォールト・トレランス・アノテーションのパラメーターをランタイムにオーバーライドできます。 このフィーチャーについて詳しくは、 仕様の第 12 章 を参照してください。
    • (mpFaultTolerance-1.1 以降を使用している場合) フォールト・トレランス・アノテーションでアノテーションを付けられたそれぞれのメソッドをモニターできるように、メトリックが自動的にエクスポートされます。 このフィーチャーについて詳しくは、 Spec の第 11 章 およびこの OpenLiberty ブログ投稿 を参照してください。
    • (mpFaultTolerance-2.0 を使用している場合) フォールト・トレランスは CDI インターセプターを使用して実装されます。 mpFaultTolerance-2.0 を使用しており、アプリケーションで他の CDI インターセプターも使用している場合は、フォールト・トレランス・インターセプターの優先順位を調整して、他のインターセプターと対話する方法を構成することができます。 これを行う方法については、 仕様の第 3 章 を参照してください。
    • フォールト・トレランスの異なるバージョン間の変更の要約については、 仕様のリリース・ノート を参照してください。

回路ブレーカーのコード・スニペット 1: CircuitBreaker および Timeout が構成された CircuitBreakerBean の作成。

@RequestScoped
public class CircuitBreakerBean {

    private int executionCounterA = 0;

    // The combined effect of the specified requestVolumeThreshold and failureRatio is that 3  
    // failures will trigger the circuit to open.
    // After a 1 second delay the Circuit will allow fresh attempts to invoke the service.
    @CircuitBreaker(delay = 1, delayUnit = ChronoUnit.SECONDS, requestVolumeThreshold = 3, failureRatio = 1.0)
    // A service is considered to have timed out after 3 seconds
    @Timeout(value = 3, unit = ChronoUnit.SECONDS)
    public String serviceA() {
        executionCounterA++;

        if (executionCounterA <= 3) {
            //Sleep for 10 secs to force a timeout
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                System.out.println("serviceA interrupted");
            }
        }

回路ブレーカーのコード・スニペット 2: CircuitBreakerBean の使用。

@Inject
CircuitBreakerBean bean;

// FaultTolerance bean with circuit breaker, should fail 3 times
for (int i = 0; i < 3; i++) {
    try {
        bean.serviceA();
        throw new AssertionError("TimeoutException not caught");
    } catch (TimeoutException e) {
            //expected
    }
}

// The CircuitBreaker should be open, so calling serviceA should generate a 
// CircuitBreakerOpenException.
try {
    bean.serviceA();
    throw new AssertionError("CircuitBreakerOpenException not caught");
} catch (CircuitBreakerOpenException e) {
    //expected
}

//allow time for the circuit to re-close
Thread.sleep(3000);

// The CircuitBreaker should be closed and serviceA should now succeed.
String res = bean.serviceA();
if (!"serviceA: 4".equals(res)) {
    throw new AssertionError("Bad Result: " + res);
}

フォールバックおよび再試行のコード・スニペット 1: FallbackHandler および Retry ポリシーが構成された FTServiceBean。

@RequestScoped
public class FTServiceBean {

    // Annotate serviceA with a named FallbackHandler and a Retry policy specifying the
    // number of retries.
    @Retry(maxRetries = 2)
    @Fallback(StringFallbackHandler.class)
    public String serviceA() {
        throw new RuntimeException("Connection failed");
        return null;
    }   
}

フォールバックおよび再試行のコード・スニペット 2: メイン・サービスに障害が起こった場合に駆動されるコードである FallbackHandler。

@Dependent
public class StringFallbackHandler implements FallbackHandler<String> {

    @Override
    public String handle(ExecutionContext context) {
        return "fallback for " + context.getMethod().getName();
    }
}

フォールバックおよび再試行のコード・スニペット 3: FTServiceBean の使用。

private @Inject FTServiceBean ftServiceBean;
    
try {
    // Call serviceA, which will be retried twice in the event of failure, after which
    // the FallbackHandler will be driven.
    String result = ftServiceBean.serviceA();
    if(!result.contains("serviceA"))
       throw new AssertionError("The message should be \"fallback for serviceA\"");
 }
catch(RuntimeException ex) {
    throw new AssertionError("serviceA should not throw a RuntimeException");
}
非同期のコード・スニペット 1: Future または CompletionStage を返すメソッドを持つ AsynchronousBean の作成。
@RequestScoped
public class AsynchronousBean {
    
    @Asynchronous
    public Future<String> serviceA() {
        try {
            // Sleep to simulate work
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException("serviceA interrupted", e);
        }
        // Return the result in a completed CompletableFuture
        return CompletableFuture.completedFuture("serviceA OK");
    }
    
    // Note: returning a CompletionStage requires Fault Tolerance 2.0
    @Asynchronous
    public CompletionStage<String> serviceB() {
        try {
            // Sleep to simulate work
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException("serviceB interrupted", e);
        }
        // Return the result in a completed CompletableFuture (which implements CompletionStage)
        return CompletableFuture.completedFuture("serviceB OK");
    }

}
非同期のコード・スニペット 2: AsynchronousBean の使用。
@Inject AsynchronousBean asyncBean;

// serviceA and serviceB methods will run in parallel because they are annotated with @Asynchronous
Future<String> resultA = asyncBean.serviceA();
CompletionStage<String> resultB = asyncBean.serviceB();

// The CompletionStage returned from serviceB allows us to add actions which take place when the serviceB method finishes
resultB.thenAccept((r) -> System.out.println("ServiceB result: " + r))
       .exceptionally((ex) -> {
           System.out.println("ServiceB failed");
           ex.printStackTrace();
           return null;
       });

// For the Future returned from serviceA, we need to wait for it to finish, then we can handle the result
try {
    System.out.println("serviceA result: " + resultA.get());
} catch (ExecutionException ex) {
    System.out.println("ServiceA failed");
    ex.printStackTrace();
} catch (InterruptedException ex) {
    System.out.println("Interrupted waiting for serviceA");
}

バルクヘッドのコード・スニペット 1: Bulkhead が構成された BulkheadBean の作成。

@RequestScoped
@Asynchronous
public class BulkheadBean {

    private final AtomicInteger connectATokens = new AtomicInteger(0);

    // Configure a Bulkhead that supports at most 2 concurrent threads.
    @Bulkhead(maxThreads = 2)
    public Future<Boolean> connectA(String data) throws InterruptedException {
        System.out.println("connectA starting " + data);
        int token = connectATokens.incrementAndGet();
        try {
            if (token > 2) {
                throw new RuntimeException("Too many threads in connectA[" + data + "]: " + token);
            }
            Thread.sleep(5000);
            return CompletableFuture.completedFuture(Boolean.TRUE);
        } finally {
            connectATokens.decrementAndGet();
            System.out.println("connectA complete " + data);
        }
    }
}

バルクヘッドのコード・スニペット 2: BulkheadBean の使用。

@Inject
BulkheadBean bean;

// connectA has a poolSize of 2
// The first two calls to connectA should be run straight away, in parallel, each around
// 5 seconds
Future<Boolean> future1 = bean.connectA("One");
Thread.sleep(100);
Future<Boolean> future2 = bean.connectA("Two");
Thread.sleep(100);

// The next two calls to connectA should wait until the first 2 have finished
Future<Boolean> future3 = bean.connectA("Three");
Thread.sleep(100);
Future<Boolean> future4 = bean.connectA("Four");
Thread.sleep(100);

//total time should be just over 10s
Thread.sleep(11000);

if (!future1.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future1 did not complete properly");
}
if (!future2.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future2 did not complete properly");
}
if (!future3.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future3 did not complete properly");
}
if (!future4.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future4 did not complete properly");
}