package org.apache.flink.statefun.flink.core.httpfn;

import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;
import okhttp3.Call;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import org.apache.flink.statefun.flink.core.common.PolyglotUtil;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.class */
final class DefaultHttpRequestReplyClient implements RequestReplyClient {
    private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse(HttpPostBodyUtil.DEFAULT_BINARY_CONTENT_TYPE);
    private final HttpUrl url;
    private final OkHttpClient client;
    private final BooleanSupplier isShutdown;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultHttpRequestReplyClient(HttpUrl httpUrl, OkHttpClient okHttpClient, BooleanSupplier booleanSupplier) {
        this.url = (HttpUrl) Objects.requireNonNull(httpUrl);
        this.client = (OkHttpClient) Objects.requireNonNull(okHttpClient);
        this.isShutdown = (BooleanSupplier) Objects.requireNonNull(booleanSupplier);
    }

    @Override // org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient
    public CompletableFuture<FromFunction> call(ToFunctionRequestSummary toFunctionRequestSummary, RemoteInvocationMetrics remoteInvocationMetrics, ToFunction toFunction) {
        Call newCall = this.client.newCall(new Request.Builder().url(this.url).post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray())).build());
        RetryingCallback retryingCallback = new RetryingCallback(toFunctionRequestSummary, remoteInvocationMetrics, newCall.timeout(), this.isShutdown);
        retryingCallback.attachToCall(newCall);
        return retryingCallback.future().thenApply(DefaultHttpRequestReplyClient::parseResponse);
    }

    private static FromFunction parseResponse(Response response) {
        InputStream responseBody = responseBody(response);
        try {
            return (FromFunction) PolyglotUtil.parseProtobufOrThrow(FromFunction.parser(), responseBody);
        } finally {
            IOUtils.closeQuietly(responseBody);
        }
    }

    private static InputStream responseBody(Response response) {
        Preconditions.checkState(response.isSuccessful(), "Unexpected HTTP status code %s", new Object[]{Integer.valueOf(response.code())});
        Preconditions.checkState(response.body() != null, "Unexpected empty HTTP response (no body)");
        Preconditions.checkState(Objects.equals(response.body().contentType(), MEDIA_TYPE_BINARY), "Wrong HTTP content-type %s", new Object[]{response.body().contentType()});
        return response.body().byteStream();
    }
}
