package org.apache.flink.statefun.flink.core.httpfn;

import java.net.URI;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.apache.flink.statefun.flink.core.common.ManagingResources;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.class */
public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
    private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
    private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;

    @Nullable
    private OkHttpClient sharedClient;
    private volatile boolean shutdown;

    public HttpFunctionProvider(Map<FunctionType, HttpFunctionEndpointSpec> map, Map<String, HttpFunctionEndpointSpec> map2) {
        this.specificTypeEndpointSpecs = (Map) Objects.requireNonNull(map);
        this.perNamespaceEndpointSpecs = (Map) Objects.requireNonNull(map2);
    }

    @Override // org.apache.flink.statefun.sdk.StatefulFunctionProvider
    public StatefulFunction functionOfType(FunctionType functionType) {
        HttpFunctionEndpointSpec endpointsSpecOrThrow = getEndpointsSpecOrThrow(functionType);
        return new RequestReplyFunction(endpointsSpecOrThrow.maxNumBatchRequests(), buildHttpClient(endpointsSpecOrThrow, functionType));
    }

    private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
        HttpFunctionEndpointSpec httpFunctionEndpointSpec = this.specificTypeEndpointSpecs.get(functionType);
        if (httpFunctionEndpointSpec != null) {
            return httpFunctionEndpointSpec;
        }
        HttpFunctionEndpointSpec httpFunctionEndpointSpec2 = this.perNamespaceEndpointSpecs.get(functionType.namespace());
        if (httpFunctionEndpointSpec2 != null) {
            return httpFunctionEndpointSpec2;
        }
        throw new IllegalStateException("Unknown type: " + functionType);
    }

    private RequestReplyClient buildHttpClient(HttpFunctionEndpointSpec httpFunctionEndpointSpec, FunctionType functionType) {
        HttpUrl httpUrl;
        if (this.sharedClient == null) {
            this.sharedClient = OkHttpUtils.newClient();
        }
        OkHttpClient.Builder newBuilder = this.sharedClient.newBuilder();
        newBuilder.callTimeout(httpFunctionEndpointSpec.maxRequestDuration());
        newBuilder.connectTimeout(httpFunctionEndpointSpec.connectTimeout());
        newBuilder.readTimeout(httpFunctionEndpointSpec.readTimeout());
        newBuilder.writeTimeout(httpFunctionEndpointSpec.writeTimeout());
        URI apply = httpFunctionEndpointSpec.urlPathTemplate().apply(functionType);
        if (UnixDomainHttpEndpoint.validate(apply)) {
            UnixDomainHttpEndpoint parseFrom = UnixDomainHttpEndpoint.parseFrom(apply);
            httpUrl = new HttpUrl.Builder().scheme("http").host("unused").addPathSegment(parseFrom.pathSegment).build();
            OkHttpUnixSocketBridge.configureUnixDomainSocket(newBuilder, parseFrom.unixDomainFile);
        } else {
            httpUrl = HttpUrl.get(apply);
        }
        return new HttpRequestReplyClient(httpUrl, newBuilder.build(), () -> {
            return this.shutdown;
        });
    }

    @Override // org.apache.flink.statefun.flink.core.common.ManagingResources
    public void shutdown() {
        this.shutdown = true;
        OkHttpUtils.closeSilently(this.sharedClient);
    }
}
