package org.apache.flink.queryablestate.itcases;

import com.esotericsoftware.kryo.Serializer;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.class */
public abstract class AbstractQueryableStateTestBase {
    private static final long RETRY_TIMEOUT = 50;
    private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
    private StateBackend stateBackend;
    protected static QueryableStateClient client;
    protected static ClusterClient<?> clusterClient;
    protected static int maxParallelism;

    @TempDir
    static File classloaderFolder;
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(200);

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension<>(() -> {
        return Executors.newScheduledThreadPool(4);
    });

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$AggregatingTestOperator.class */
    private static class AggregatingTestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<Tuple2<Integer, Long>, String> {
        private static final long serialVersionUID = 1;
        private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor;
        private transient AggregatingState<Tuple2<Integer, Long>, String> state;

        AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggregatingStateDescriptor) {
            this.stateDescriptor = aggregatingStateDescriptor;
        }

        public void open() throws Exception {
            super.open();
            this.state = getKeyedStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.stateDescriptor);
        }

        public void processElement(StreamRecord<Tuple2<Integer, Long>> streamRecord) throws Exception {
            this.state.add(streamRecord.getValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$AutoCancellableJob.class */
    private static class AutoCancellableJob implements AutoCloseable {
        private final ClusterClient<?> clusterClient;
        private final JobGraph jobGraph;
        private final JobID jobId;
        private final Deadline deadline;

        AutoCancellableJob(Deadline deadline, ClusterClient<?> clusterClient, StreamExecutionEnvironment streamExecutionEnvironment) {
            Preconditions.checkNotNull(streamExecutionEnvironment);
            this.clusterClient = (ClusterClient) Preconditions.checkNotNull(clusterClient);
            this.jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
            this.jobId = (JobID) Preconditions.checkNotNull(this.jobGraph.getJobID());
            this.deadline = deadline;
        }

        JobGraph getJobGraph() {
            return this.jobGraph;
        }

        JobID getJobId() {
            return this.jobId;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.clusterClient.cancel(this.jobId).get();
            Assertions.assertThat((Comparable) FutureUtils.retrySuccessfulWithDelay(() -> {
                return this.clusterClient.getJobStatus(this.jobId);
            }, Duration.ofMillis(AbstractQueryableStateTestBase.RETRY_TIMEOUT), this.deadline, jobStatus -> {
                return jobStatus.equals(JobStatus.CANCELED);
            }, new ScheduledExecutorServiceAdapter((ScheduledExecutorService) AbstractQueryableStateTestBase.EXECUTOR_EXTENSION.getExecutor())).get(this.deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(JobStatus.CANCELED);
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$SumAggr.class */
    private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, String, String> {
        private static final long serialVersionUID = -6249227626701264599L;

        private SumAggr() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public String m1createAccumulator() {
            return "0";
        }

        public String add(Tuple2<Integer, Long> tuple2, String str) {
            return Long.toString(Long.valueOf(str).longValue() + ((Long) tuple2.f1).longValue());
        }

        public String getResult(String str) {
            return str;
        }

        public String merge(String str, String str2) {
            return Long.toString(Long.valueOf(str).longValue() + Long.valueOf(str2).longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$SumReduce.class */
    protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
        private static final long serialVersionUID = -8651235077342052336L;

        protected SumReduce() {
        }

        public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tuple2, Tuple2<Integer, Long> tuple22) throws Exception {
            tuple2.f1 = Long.valueOf(((Long) tuple2.f1).longValue() + ((Long) tuple22.f1).longValue());
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$TestAscendingValueSource.class */
    private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1459935229498173245L;
        private final long maxValue;
        private volatile boolean isRunning = true;

        TestAscendingValueSource(long j) {
            Preconditions.checkArgument(j >= 0);
            this.maxValue = j;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> sourceContext) throws Exception {
            Tuple2 tuple2 = new Tuple2(Integer.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), 0L);
            long j = 0;
            while (true) {
                long j2 = j;
                if (!this.isRunning || j2 > this.maxValue) {
                    break;
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    tuple2.f1 = Long.valueOf(j2);
                    sourceContext.collect(tuple2);
                }
                j = j2 + 1;
            }
            while (this.isRunning) {
                synchronized (this) {
                    wait();
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
            synchronized (this) {
                notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase$TestKeyRangeSource.class */
    private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener {
        private static final long serialVersionUID = -5744725196953582710L;
        private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
        private final int numKeys;
        private final ThreadLocalRandom random = ThreadLocalRandom.current();
        private volatile boolean isRunning = true;
        private int counter = 0;

        TestKeyRangeSource(int i) {
            this.numKeys = i;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(0L);
            }
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> sourceContext) throws Exception {
            Tuple2 tuple2 = new Tuple2(0, 1L);
            while (this.isRunning) {
                synchronized (sourceContext.getCheckpointLock()) {
                    tuple2.f0 = Integer.valueOf(this.random.nextInt(this.numKeys));
                    sourceContext.collect(tuple2);
                    this.counter++;
                }
                if (this.counter % 50 == 0) {
                    Thread.sleep(1L);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(j);
            }
        }

        public void notifyCheckpointAborted(long j) {
        }
    }

    @BeforeEach
    void setUp() throws Exception {
        this.stateBackend = createStateBackend();
        Assertions.assertThat(clusterClient).isNotNull();
        maxParallelism = 4;
    }

    protected abstract StateBackend createStateBackend() throws Exception;

    @Test
    void testQueryableState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestKeyRangeSource(256));
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("any-name", new SumReduce(), addSource.getType());
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.1
            private static final long serialVersionUID = 7143749578983540352L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("hakuna-matata", reducingStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                AtomicLongArray atomicLongArray = new AtomicLongArray(256);
                ArrayList arrayList = new ArrayList(256);
                boolean z = false;
                while (!z && plus.hasTimeLeft()) {
                    z = true;
                    arrayList.clear();
                    for (int i = 0; i < 256; i++) {
                        int i2 = i;
                        if (atomicLongArray.get(i2) <= 0) {
                            z = false;
                            CompletableFuture kvState = getKvState(plus, client, jobId, "hakuna-matata", Integer.valueOf(i2), BasicTypeInfo.INT_TYPE_INFO, reducingStateDescriptor, false, this.executor);
                            kvState.thenAccept(reducingState -> {
                                Assertions.assertThatCode(() -> {
                                    Tuple2 tuple2 = (Tuple2) reducingState.get();
                                    atomicLongArray.set(i2, ((Long) tuple2.f1).longValue());
                                    Assertions.assertThat(i2).isEqualTo(((Integer) tuple2.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                                }).doesNotThrowAnyException();
                            });
                            arrayList.add(kvState);
                        }
                    }
                    CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                }
                Assertions.assertThat(z).isTrue().withFailMessage("Not all keys are non-zero", new Object[0]);
                for (int i3 = 0; i3 < 256; i3++) {
                    long j = atomicLongArray.get(i3);
                    Assertions.assertThat(j).isGreaterThan(0L).withFailMessage("Count at position " + i3 + " is " + j, new Object[0]);
                }
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testDuplicateRegistrationFailsJob() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestKeyRangeSource(256));
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.2
            private static final long serialVersionUID = -4126824763829132959L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("duplicate-me", new ReducingStateDescriptor("any-name", new SumReduce(), addSource.getType()));
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.3
            private static final long serialVersionUID = -6265024000462809436L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("duplicate-me");
        CompletableFuture submitJob = clusterClient.submitJob(executionEnvironment.getStreamGraph().getJobGraph());
        ClusterClient<?> clusterClient2 = clusterClient;
        clusterClient2.getClass();
        submitJob.thenCompose(clusterClient2::requestJobResult).thenApply((v0) -> {
            return v0.getSerializedThrowable();
        }).thenAccept(optional -> {
            Assertions.assertThat(optional).isPresent();
            Assertions.assertThat(ExceptionUtils.stringifyException(((SerializedThrowable) optional.get()).deserializeError(getClass().getClassLoader()))).contains(new CharSequence[]{"KvState with name 'duplicate-me' has already been registered by another operator"});
        }).get();
    }

    @Test
    void testValueState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        ValueStateDescriptor<Tuple2<Integer, Long>> valueStateDescriptor = new ValueStateDescriptor<>("any", addSource.getType());
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.4
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("hakuna", valueStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                executeValueQuery(plus, client, jobId, "hakuna", valueStateDescriptor, 1024L);
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testCustomKryoSerializerHandling() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        URLClassLoader createLoaderWithCustomKryoSerializer = createLoaderWithCustomKryoSerializer("CustomKryo");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        executionEnvironment.getConfig().getSerializerConfig().addDefaultKryoSerializer(Byte.class, (Serializable) createSerializer(createLoaderWithCustomKryoSerializer));
        ValueStateDescriptor<Tuple2<Integer, Long>> valueStateDescriptor = new ValueStateDescriptor<>("any", new GenericTypeInfo(Tuple2.class));
        executionEnvironment.addSource(new TestAscendingValueSource(1L)).keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.5
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("teriberka", valueStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            jobGraph.setClasspaths(Arrays.asList(createLoaderWithCustomKryoSerializer.getURLs()));
            clusterClient.submitJob(jobGraph).get();
            try {
                client.setUserClassLoader(createLoaderWithCustomKryoSerializer);
                executeValueQuery(plus, client, jobId, "teriberka", valueStateDescriptor, 1L);
                client.setUserClassLoader((ClassLoader) null);
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                client.setUserClassLoader((ClassLoader) null);
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (0 != 0) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    <T extends Serializer<Byte> & Serializable> T createSerializer(ClassLoader classLoader) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        return (Serializer) classLoader.loadClass("CustomKryo").newInstance();
    }

    @Disabled
    @Test
    void testWrongJobIdAndWrongQueryableStateName() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", addSource.getType());
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.6
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("hakuna", valueStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                CompletableFuture jobStatus = clusterClient.getJobStatus(autoCancellableJob.getJobId());
                while (plus.hasTimeLeft() && !((JobStatus) jobStatus.get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).equals(JobStatus.RUNNING)) {
                    Thread.sleep(RETRY_TIMEOUT);
                    jobStatus = clusterClient.getJobStatus(autoCancellableJob.getJobId());
                }
                Assertions.assertThat((Comparable) jobStatus.get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(JobStatus.RUNNING);
                JobID jobID = new JobID();
                CompletableFuture kvState = client.getKvState(jobID, "hakuna", 0, BasicTypeInfo.INT_TYPE_INFO, valueStateDescriptor);
                Assertions.assertThatThrownBy(() -> {
                }).isInstanceOf(ExecutionException.class).cause().isInstanceOf(RuntimeException.class).hasMessage("FlinkJobNotFoundException: Could not find Flink job (" + jobID + ")");
                CompletableFuture kvState2 = client.getKvState(autoCancellableJob.getJobId(), "wrong-hakuna", 0, BasicTypeInfo.INT_TYPE_INFO, valueStateDescriptor);
                Assertions.assertThatThrownBy(() -> {
                }).isInstanceOf(ExecutionException.class).cause().isInstanceOf(RuntimeException.class).hasMessage("UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'.");
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testQueryNonStartedJobState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        ValueStateDescriptor<Tuple2<Integer, Long>> valueStateDescriptor = new ValueStateDescriptor<>("any", addSource.getType(), (Object) null);
        QueryableStateStream asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.7
            private static final long serialVersionUID = 7480503339992214681L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("hakuna", valueStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                JobGraph jobGraph = autoCancellableJob.getJobGraph();
                client.getKvState(autoCancellableJob.getJobId(), asQueryableState.getQueryableStateName(), 0, BasicTypeInfo.INT_TYPE_INFO, valueStateDescriptor);
                clusterClient.submitJob(jobGraph).get();
                executeValueQuery(plus, client, jobId, "hakuna", valueStateDescriptor, 1024L);
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testValueStateDefault() throws Throwable {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", addSource.getType(), Tuple2.of(0, 1337L));
        QueryableStateStream asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.8
            private static final long serialVersionUID = 4509274556892655887L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return 1;
            }
        }).asQueryableState("hakuna", valueStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                CompletableFuture kvState = getKvState(plus, client, jobId, asQueryableState.getQueryableStateName(), 0, BasicTypeInfo.INT_TYPE_INFO, valueStateDescriptor, true, this.executor);
                Assertions.assertThatThrownBy(() -> {
                }).cause().isInstanceOf(UnknownKeyOrNamespaceException.class);
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testValueStateShortcut() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        ValueStateDescriptor<Tuple2<Integer, Long>> valueStateDescriptor = (ValueStateDescriptor) executionEnvironment.addSource(new TestAscendingValueSource(1024L)).keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.9
            private static final long serialVersionUID = 9168901838808830068L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("matata").getStateDescriptor();
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            JobID jobId = autoCancellableJob.getJobId();
            clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
            executeValueQuery(plus, client, jobId, "matata", valueStateDescriptor, 1024L);
            if (autoCancellableJob != null) {
                if (0 == 0) {
                    autoCancellableJob.close();
                    return;
                }
                try {
                    autoCancellableJob.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCancellableJob != null) {
                if (0 != 0) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testReducingState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("any", new SumReduce(), addSource.getType());
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.10
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).asQueryableState("jungle", reducingStateDescriptor);
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                for (int i = 0; i < maxParallelism; i++) {
                    boolean z = false;
                    while (plus.hasTimeLeft() && !z) {
                        Tuple2 tuple2 = (Tuple2) ((ReducingState) getKvState(plus, client, jobId, "jungle", Integer.valueOf(i), BasicTypeInfo.INT_TYPE_INFO, reducingStateDescriptor, false, this.executor).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get();
                        Assertions.assertThat(i).isEqualTo(((Integer) tuple2.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                        if (524800 == ((Long) tuple2.f1).longValue()) {
                            z = true;
                        } else {
                            Thread.sleep(RETRY_TIMEOUT);
                        }
                    }
                    Assertions.assertThat(z).isTrue().withFailMessage("Did not succeed query", new Object[0]);
                }
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testMapState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("timon", BasicTypeInfo.INT_TYPE_INFO, addSource.getType());
        mapStateDescriptor.setQueryable("timon-queryable");
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.12
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.11
            private static final long serialVersionUID = -805125545438296619L;
            private transient MapState<Integer, Tuple2<Integer, Long>> mapState;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.mapState = getRuntimeContext().getMapState(mapStateDescriptor);
            }

            public void processElement(Tuple2<Integer, Long> tuple2, ProcessFunction<Tuple2<Integer, Long>, Object>.Context context, Collector<Object> collector) throws Exception {
                Tuple2 tuple22 = (Tuple2) this.mapState.get(tuple2.f0);
                if (tuple22 == null) {
                    tuple22 = new Tuple2(tuple2.f0, 0L);
                }
                this.mapState.put(tuple2.f0, new Tuple2(tuple22.f0, Long.valueOf(((Long) tuple22.f1).longValue() + ((Long) tuple2.f1).longValue())));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<Integer, Long>) obj, (ProcessFunction<Tuple2<Integer, Long>, Object>.Context) context, (Collector<Object>) collector);
            }
        });
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            try {
                JobID jobId = autoCancellableJob.getJobId();
                clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
                for (int i = 0; i < maxParallelism; i++) {
                    boolean z = false;
                    while (plus.hasTimeLeft() && !z) {
                        Tuple2 tuple2 = (Tuple2) ((MapState) getKvState(plus, client, jobId, "timon-queryable", Integer.valueOf(i), BasicTypeInfo.INT_TYPE_INFO, mapStateDescriptor, false, this.executor).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get(Integer.valueOf(i));
                        if (tuple2 == null || tuple2.f0 == null || 524800 != ((Long) tuple2.f1).longValue()) {
                            Thread.sleep(RETRY_TIMEOUT);
                        } else {
                            Assertions.assertThat(i).isEqualTo(((Integer) tuple2.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                            z = true;
                        }
                    }
                    Assertions.assertThat(z).isTrue().withFailMessage("Did not succeed query", new Object[0]);
                }
                if (autoCancellableJob != null) {
                    if (0 == 0) {
                        autoCancellableJob.close();
                        return;
                    }
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCancellableJob != null) {
                if (th != null) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testListState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        final ListStateDescriptor listStateDescriptor = new ListStateDescriptor("list", BasicTypeInfo.LONG_TYPE_INFO);
        listStateDescriptor.setQueryable("list-queryable");
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.14
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.13
            private static final long serialVersionUID = -805125545438296619L;
            private transient ListState<Long> listState;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.listState = getRuntimeContext().getListState(listStateDescriptor);
            }

            public void processElement(Tuple2<Integer, Long> tuple2, ProcessFunction<Tuple2<Integer, Long>, Object>.Context context, Collector<Object> collector) throws Exception {
                this.listState.add(tuple2.f1);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<Integer, Long>) obj, (ProcessFunction<Tuple2<Integer, Long>, Object>.Context) context, (Collector<Object>) collector);
            }
        });
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            JobID jobId = autoCancellableJob.getJobId();
            clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
            HashMap hashMap = new HashMap();
            for (int i = 0; i < maxParallelism; i++) {
                boolean z = false;
                while (plus.hasTimeLeft() && !z) {
                    Iterable iterable = (Iterable) ((ListState) getKvState(plus, client, jobId, "list-queryable", Integer.valueOf(i), BasicTypeInfo.INT_TYPE_INFO, listStateDescriptor, false, this.executor).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get();
                    HashSet hashSet = new HashSet();
                    Iterator it = iterable.iterator();
                    while (it.hasNext()) {
                        hashSet.add((Long) it.next());
                    }
                    if (hashSet.size() == 1025) {
                        z = true;
                        hashMap.put(Integer.valueOf(i), hashSet);
                    } else {
                        Thread.sleep(RETRY_TIMEOUT);
                    }
                }
                Assertions.assertThat(z).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
            for (int i2 = 0; i2 < maxParallelism; i2++) {
                Set set = (Set) hashMap.get(Integer.valueOf(i2));
                for (long j = 0; j <= 1024; j++) {
                    Assertions.assertThat(set).contains(new Long[]{Long.valueOf(j)});
                }
            }
            if (autoCancellableJob != null) {
                if (0 == 0) {
                    autoCancellableJob.close();
                    return;
                }
                try {
                    autoCancellableJob.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCancellableJob != null) {
                if (0 != 0) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testAggregatingState() throws Exception {
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(this.stateBackend);
        executionEnvironment.setParallelism(maxParallelism);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("aggregates", new SumAggr(), String.class);
        aggregatingStateDescriptor.setQueryable("aggr-queryable");
        addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.15
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> tuple2) {
                return (Integer) tuple2.f0;
            }
        }).transform("TestAggregatingOperator", BasicTypeInfo.STRING_TYPE_INFO, new AggregatingTestOperator(aggregatingStateDescriptor));
        AutoCancellableJob autoCancellableJob = new AutoCancellableJob(plus, clusterClient, executionEnvironment);
        Throwable th = null;
        try {
            JobID jobId = autoCancellableJob.getJobId();
            clusterClient.submitJob(autoCancellableJob.getJobGraph()).get();
            for (int i = 0; i < maxParallelism; i++) {
                boolean z = false;
                while (plus.hasTimeLeft() && !z) {
                    if (Long.parseLong((String) ((AggregatingState) getKvState(plus, client, jobId, "aggr-queryable", Integer.valueOf(i), BasicTypeInfo.INT_TYPE_INFO, aggregatingStateDescriptor, false, this.executor).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get()) == 524800) {
                        z = true;
                    } else {
                        Thread.sleep(RETRY_TIMEOUT);
                    }
                }
                Assertions.assertThat(z).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
            if (autoCancellableJob != null) {
                if (0 == 0) {
                    autoCancellableJob.close();
                    return;
                }
                try {
                    autoCancellableJob.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCancellableJob != null) {
                if (0 != 0) {
                    try {
                        autoCancellableJob.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCancellableJob.close();
                }
            }
            throw th3;
        }
    }

    private static <K, S extends State, V> CompletableFuture<S> getKvState(Deadline deadline, QueryableStateClient queryableStateClient, JobID jobID, String str, K k, TypeInformation<K> typeInformation, StateDescriptor<S, V> stateDescriptor, boolean z, ScheduledExecutor scheduledExecutor) {
        CompletableFuture<S> completableFuture = new CompletableFuture<>();
        getKvStateIgnoringCertainExceptions(deadline, completableFuture, queryableStateClient, jobID, str, k, typeInformation, stateDescriptor, z, scheduledExecutor);
        return completableFuture;
    }

    private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(Deadline deadline, CompletableFuture<S> completableFuture, QueryableStateClient queryableStateClient, JobID jobID, String str, K k, TypeInformation<K> typeInformation, StateDescriptor<S, V> stateDescriptor, boolean z, ScheduledExecutor scheduledExecutor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture kvState = queryableStateClient.getKvState(jobID, str, k, typeInformation, stateDescriptor);
        kvState.whenCompleteAsync((state, th) -> {
            if (th == null) {
                completableFuture.complete(state);
                return;
            }
            if ((th.getCause() instanceof CancellationException) || (th.getCause() instanceof AssertionError) || (z && (th.getCause() instanceof UnknownKeyOrNamespaceException))) {
                completableFuture.completeExceptionally(th.getCause());
            } else if (deadline.hasTimeLeft()) {
                getKvStateIgnoringCertainExceptions(deadline, completableFuture, queryableStateClient, jobID, str, k, typeInformation, stateDescriptor, z, scheduledExecutor);
            }
        }, (Executor) scheduledExecutor);
        completableFuture.whenComplete((state2, th2) -> {
            kvState.cancel(false);
        });
    }

    private void executeValueQuery(Deadline deadline, QueryableStateClient queryableStateClient, JobID jobID, String str, ValueStateDescriptor<Tuple2<Integer, Long>> valueStateDescriptor, long j) throws Exception {
        for (int i = 0; i < maxParallelism; i++) {
            boolean z = false;
            while (deadline.hasTimeLeft() && !z) {
                Tuple2 tuple2 = (Tuple2) ((ValueState) getKvState(deadline, queryableStateClient, jobID, str, Integer.valueOf(i), BasicTypeInfo.INT_TYPE_INFO, valueStateDescriptor, false, this.executor).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).value();
                Assertions.assertThat(i).isEqualTo(((Integer) tuple2.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                if (j == ((Long) tuple2.f1).longValue()) {
                    z = true;
                } else {
                    Thread.sleep(RETRY_TIMEOUT);
                }
            }
            Assertions.assertThat(z).isTrue().withFailMessage("Did not succeed query", new Object[0]);
        }
    }

    private static URLClassLoader createLoaderWithCustomKryoSerializer(String str) throws IOException {
        return ClassLoaderUtils.compileAndLoadJava(classloaderFolder, str + ".java", "import com.esotericsoftware.kryo.Kryo;\nimport com.esotericsoftware.kryo.Serializer;\nimport com.esotericsoftware.kryo.io.Input;\nimport com.esotericsoftware.kryo.io.Output;\nimport java.io.Serializable;\npublic class " + str + " extends Serializer<Byte> implements Serializable {\n    @Override\n    public void write(Kryo kryo, Output output, Byte testJob) {}\n    @Override\n    public Byte read(Kryo kryo, Input input, Class<Byte> aClass) {\n        return null;\n    }\n}\n");
    }
}
