/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.Response;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.config.SdkClientOption;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.AbortedException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkInterruptedException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.timers.TimerUtils;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpFullRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.FunctionalUtils;

@SdkInternalApi
public final class ApiCallTimeoutTrackingStage<OutputT>
implements RequestToResponsePipeline<OutputT> {
    private final RequestPipeline<SdkHttpFullRequest, Response<OutputT>> wrapped;
    private final SdkClientConfiguration clientConfig;
    private final ScheduledExecutorService timeoutExecutor;
    private final Duration apiCallTimeout;

    public ApiCallTimeoutTrackingStage(HttpClientDependencies dependencies, RequestPipeline<SdkHttpFullRequest, Response<OutputT>> wrapped) {
        this.wrapped = wrapped;
        this.clientConfig = dependencies.clientConfiguration();
        this.timeoutExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
        this.apiCallTimeout = this.clientConfig.option(SdkClientOption.API_CALL_TIMEOUT);
    }

    @Override
    public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
        try {
            return this.executeWithTimer(request, context);
        }
        catch (Exception e) {
            throw this.translatePipelineException(context, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Response<OutputT> executeWithTimer(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
        Response<OutputT> response;
        long timeoutInMillis = TimerUtils.resolveTimeoutInMillis(context.requestConfig()::apiCallTimeout, this.apiCallTimeout);
        TimeoutTracker timeoutTracker = TimerUtils.timeSyncTaskIfNeeded(this.timeoutExecutor, timeoutInMillis, Thread.currentThread());
        try {
            context.apiCallTimeoutTracker(timeoutTracker);
            response = this.wrapped.execute(request, context);
        }
        finally {
            timeoutTracker.cancel();
        }
        if (timeoutTracker.hasExecuted()) {
            Thread.interrupted();
        }
        return response;
    }

    private Exception translatePipelineException(RequestExecutionContext context, Exception e) {
        if (e instanceof InterruptedException) {
            return this.handleInterruptedException(context, (InterruptedException)e);
        }
        if (context.apiCallTimeoutTracker().hasExecuted()) {
            Thread.interrupted();
        }
        return e;
    }

    private RuntimeException handleInterruptedException(RequestExecutionContext context, InterruptedException e) {
        if (e instanceof SdkInterruptedException) {
            ((SdkInterruptedException)e).getResponseStream().ifPresent(r -> FunctionalUtils.invokeSafely(r::close));
        }
        if (context.apiCallTimeoutTracker().hasExecuted()) {
            Thread.interrupted();
            return this.generateApiCallTimeoutException(context);
        }
        Thread.currentThread().interrupt();
        return AbortedException.create("Thread was interrupted", e);
    }

    private ApiCallTimeoutException generateApiCallTimeoutException(RequestExecutionContext context) {
        return ApiCallTimeoutException.create(TimerUtils.resolveTimeoutInMillis(context.requestConfig()::apiCallTimeout, this.apiCallTimeout));
    }
}

