/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.client.proxy;

import akka.dispatch.OnComplete;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

@ChannelHandler.Sharable
@Internal
public class KvStateClientProxyHandler
extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
    private final KvStateClientProxy proxy;
    private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache = new ConcurrentHashMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>>();
    private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;

    public KvStateClientProxyHandler(KvStateClientProxyImpl proxy, int queryExecutorThreads, MessageSerializer<KvStateRequest, KvStateResponse> serializer, KvStateRequestStats stats) {
        super((AbstractServerBase)proxy, serializer, stats);
        this.proxy = (KvStateClientProxy)Preconditions.checkNotNull((Object)((Object)proxy));
        this.kvStateClient = KvStateClientProxyHandler.createInternalClient(queryExecutorThreads);
    }

    private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) {
        MessageSerializer messageSerializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        return new Client("Queryable State Proxy Client", threads, messageSerializer, (KvStateRequestStats)new DisabledKvStateRequestStats());
    }

    public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateRequest request) {
        CompletableFuture<KvStateResponse> response = new CompletableFuture<KvStateResponse>();
        this.executeActionAsync(response, request, false);
        return response;
    }

    private void executeActionAsync(CompletableFuture<KvStateResponse> result, KvStateRequest request, boolean update) {
        if (!result.isDone()) {
            CompletableFuture<KvStateResponse> operationFuture = this.getState(request, update);
            operationFuture.whenCompleteAsync((t, throwable) -> {
                if (throwable != null) {
                    if (throwable.getCause() instanceof UnknownKvStateIdException || throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || throwable.getCause() instanceof ConnectException) {
                        LOG.debug("Retrying after failing to retrieve state due to: {}.", (Object)throwable.getCause().getMessage());
                        this.executeActionAsync(result, request, true);
                    } else {
                        result.completeExceptionally((Throwable)throwable);
                    }
                } else {
                    result.complete((KvStateResponse)t);
                }
            }, (Executor)this.queryExecutor);
            result.whenComplete((t, throwable) -> operationFuture.cancel(false));
        }
    }

    private CompletableFuture<KvStateResponse> getState(KvStateRequest request, boolean forceUpdate) {
        return this.getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate).thenComposeAsync(location -> {
            int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash((int)request.getKeyHashCode(), (int)location.getNumKeyGroups());
            InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
            if (serverAddress == null) {
                return FutureUtils.completedExceptionally((Throwable)new UnknownKvStateKeyGroupLocationException(this.getServerName()));
            }
            KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
            KvStateInternalRequest internalRequest = new KvStateInternalRequest(kvStateId, request.getSerializedKeyAndNamespace());
            return this.kvStateClient.sendRequest(serverAddress, (MessageBody)internalRequest);
        }, (Executor)this.queryExecutor);
    }

    private CompletableFuture<KvStateLocation> getKvStateLookupInfo(JobID jobId, String queryableStateName, boolean forceUpdate) {
        final Tuple2 cacheKey = new Tuple2((Object)jobId, (Object)queryableStateName);
        CompletableFuture cachedFuture = (CompletableFuture)this.lookupCache.get(cacheKey);
        if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) {
            LOG.debug("Retrieving location for state={} of job={} from the cache.", (Object)jobId, (Object)queryableStateName);
            return cachedFuture;
        }
        LOG.debug("Retrieving location for state={} of job={} from the job manager.", (Object)jobId, (Object)queryableStateName);
        final CompletableFuture location = new CompletableFuture();
        this.lookupCache.put((Tuple2<JobID, String>)cacheKey, location);
        return this.proxy.getJobManagerFuture().thenComposeAsync(jobManagerGateway -> {
            KvStateMessage.LookupKvStateLocation msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
            jobManagerGateway.ask((Object)msg, FiniteDuration.apply((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS)).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)).onComplete((Function1)new OnComplete<KvStateLocation>(){

                public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
                    if (failure != null) {
                        if (failure instanceof FlinkJobNotFoundException) {
                            KvStateClientProxyHandler.this.lookupCache.remove(cacheKey);
                        }
                        location.completeExceptionally(failure);
                    } else {
                        location.complete(loc);
                    }
                }
            }, Executors.directExecutionContext());
            return location;
        }, (Executor)this.queryExecutor);
    }

    public CompletableFuture<Void> shutdown() {
        return this.kvStateClient.shutdown();
    }
}

