package org.apache.flink.test.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.query.QueryableStateClient;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/flink/test/query/AbstractQueryableStateITCase.class */
public abstract class AbstractQueryableStateITCase extends TestLogger {
    private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
    private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
    private static ActorSystem TEST_ACTOR_SYSTEM;
    private static final int NUM_TMS = 2;
    private static final int NUM_SLOTS_PER_TM = 4;
    private static final int NUM_SLOTS = 8;
    protected AbstractStateBackend stateBackend;
    private static TestingCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/query/AbstractQueryableStateITCase$SumFold.class */
    private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
        private SumFold() {
        }

        public String fold(String str, Tuple2<Integer, Long> tuple2) throws Exception {
            return Long.toString(Long.valueOf(str).longValue() + ((Long) tuple2.f1).longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/query/AbstractQueryableStateITCase$SumReduce.class */
    private static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
        private SumReduce() {
        }

        public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tuple2, Tuple2<Integer, Long> tuple22) throws Exception {
            tuple2.f1 = Long.valueOf(((Long) tuple2.f1).longValue() + ((Long) tuple22.f1).longValue());
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/query/AbstractQueryableStateITCase$TestAscendingValueSource.class */
    private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> {
        private final long maxValue;
        private volatile boolean isRunning = true;

        public TestAscendingValueSource(long j) {
            Preconditions.checkArgument(j >= 0);
            this.maxValue = j;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> sourceContext) throws Exception {
            Tuple2 tuple2 = new Tuple2(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), 0L);
            long j = 0;
            while (true) {
                long j2 = j;
                if (!this.isRunning || j2 > this.maxValue) {
                    break;
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    tuple2.f1 = Long.valueOf(j2);
                    sourceContext.collect(tuple2);
                }
                j = j2 + 1;
            }
            while (this.isRunning) {
                synchronized (this) {
                    wait();
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
            synchronized (this) {
                notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/query/AbstractQueryableStateITCase$TestKeyRangeSource.class */
    private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener {
        private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
        private final int numKeys;
        private final ThreadLocalRandom random = ThreadLocalRandom.current();
        private volatile boolean isRunning = true;

        public TestKeyRangeSource(int i) {
            this.numKeys = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(0L);
            }
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> sourceContext) throws Exception {
            Tuple2 tuple2 = new Tuple2(0, 1L);
            while (this.isRunning) {
                synchronized (sourceContext.getCheckpointLock()) {
                    tuple2.f0 = Integer.valueOf(this.random.nextInt(this.numKeys));
                    sourceContext.collect(tuple2);
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(j);
            }
        }
    }

    @BeforeClass
    public static void setup() {
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("taskmanager.memory.size", NUM_SLOTS_PER_TM);
            configuration.setInteger("local.number-taskmanager", NUM_TMS);
            configuration.setInteger("taskmanager.numberOfTaskSlots", NUM_SLOTS_PER_TM);
            configuration.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
            configuration.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
            configuration.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
            cluster = new TestingCluster(configuration, false);
            cluster.start(true);
            TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @AfterClass
    public static void tearDown() {
        try {
            cluster.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
        if (TEST_ACTOR_SYSTEM != null) {
            TEST_ACTOR_SYSTEM.shutdown();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.stateBackend = createStateBackend();
    }

    protected abstract AbstractStateBackend createStateBackend() throws Exception;

    @Test
    public void testQueryableState() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestKeyRangeSource(256));
            final QueryableStateStream asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.1
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("hakuna-matata", new ReducingStateDescriptor("any-name", new SumReduce(), addSource.getType()));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            cluster.submitJobDetached(jobGraph);
            jobID = jobGraph.getJobID();
            final AtomicLongArray atomicLongArray = new AtomicLongArray(256);
            boolean z = false;
            while (!z && fromNow.hasTimeLeft()) {
                z = true;
                ArrayList arrayList = new ArrayList(256);
                for (int i = 0; i < 256; i++) {
                    final int i2 = i;
                    if (atomicLongArray.get(i2) <= 0) {
                        z = false;
                        Future<byte[]> kvStateWithRetries = getKvStateWithRetries(queryableStateClient, jobID, "hakuna-matata", i2, KvStateRequestSerializer.serializeKeyAndNamespace(Integer.valueOf(i2), asQueryableState.getKeySerializer(), VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE), QUERY_RETRY_DELAY, false);
                        kvStateWithRetries.onSuccess(new OnSuccess<byte[]>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.2
                            public void onSuccess(byte[] bArr) throws Throwable {
                                atomicLongArray.set(i2, ((Long) ((Tuple2) KvStateRequestSerializer.deserializeValue(bArr, asQueryableState.getValueSerializer())).f1).longValue());
                                Assert.assertEquals("Key mismatch", i2, ((Integer) r0.f0).intValue());
                            }
                        }, TEST_ACTOR_SYSTEM.dispatcher());
                        arrayList.add(kvStateWithRetries);
                    }
                }
                Await.ready(Futures.sequence(arrayList, TEST_ACTOR_SYSTEM.dispatcher()), fromNow.timeLeft());
            }
            Assert.assertTrue("Not all keys are non-zero", z);
            for (int i3 = 0; i3 < 256; i3++) {
                long j = atomicLongArray.get(i3);
                Assert.assertTrue("Count at position " + i3 + " is " + j, j > 0);
            }
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testDuplicateRegistrationFailsJob() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestKeyRangeSource(256));
            addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.3
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("duplicate-me", new ReducingStateDescriptor("any-name", new SumReduce(), addSource.getType()));
            addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.4
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("duplicate-me");
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            Future mapTo = cluster.getLeaderGateway(fromNow.timeLeft()).ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobID, JobStatus.FAILED), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class));
            cluster.submitJobDetached(jobGraph);
            Assert.assertEquals(JobStatus.FAILED, ((TestingJobManagerMessages.JobStatusIs) Await.result(mapTo, fromNow.timeLeft())).state());
            String failureCauseAsString = ((JobManagerMessages.JobFound) Await.result(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.RequestJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class)), fromNow.timeLeft())).executionGraph().getFailureCauseAsString();
            Assert.assertTrue("Not instance of SuppressRestartsException", failureCauseAsString.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
            String substring = failureCauseAsString.substring(failureCauseAsString.indexOf("Caused by: ") + "Caused by: ".length());
            Assert.assertTrue("Not caused by IllegalStateException", substring.startsWith("java.lang.IllegalStateException"));
            Assert.assertTrue("Exception does not contain registration name", substring.contains("duplicate-me"));
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            throw th;
        }
    }

    @Test
    public void testValueState() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
            QueryableStateStream<Integer, Tuple2<Integer, Long>> asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.5
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("hakuna", new ValueStateDescriptor("any", addSource.getType()));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            cluster.submitJobDetached(jobGraph);
            executeValueQuery(fromNow, queryableStateClient, jobID, asQueryableState, 1024L);
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testQueryNonStartedJobState() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
            QueryableStateStream<Integer, Tuple2<Integer, Long>> asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.6
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("hakuna", new ValueStateDescriptor("any", addSource.getType(), (Object) null));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            queryableStateClient.getKvState(jobID, asQueryableState.getQueryableStateName(), 0, KvStateRequestSerializer.serializeKeyAndNamespace(0, asQueryableState.getKeySerializer(), VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE));
            cluster.submitJobDetached(jobGraph);
            executeValueQuery(fromNow, queryableStateClient, jobID, asQueryableState, 1024L);
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    private void executeValueQuery(Deadline deadline, QueryableStateClient queryableStateClient, JobID jobID, QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableStateStream, long j) throws Exception {
        for (int i = 0; i < NUM_SLOTS; i++) {
            byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(Integer.valueOf(i), queryableStateStream.getKeySerializer(), VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
            boolean z = false;
            while (deadline.hasTimeLeft() && !z) {
                Tuple2 tuple2 = (Tuple2) KvStateRequestSerializer.deserializeValue((byte[]) Await.result(getKvStateWithRetries(queryableStateClient, jobID, queryableStateStream.getQueryableStateName(), i, serializeKeyAndNamespace, QUERY_RETRY_DELAY, false), deadline.timeLeft()), queryableStateStream.getValueSerializer());
                Assert.assertEquals("Key mismatch", i, ((Integer) tuple2.f0).intValue());
                if (j == ((Long) tuple2.f1).longValue()) {
                    z = true;
                } else {
                    Thread.sleep(50L);
                }
            }
            Assert.assertTrue("Did not succeed query", z);
        }
    }

    @Test(expected = UnknownKeyOrNamespace.class)
    public void testValueStateDefault() throws Exception, UnknownKeyOrNamespace {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
            QueryableStateStream asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.7
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return 1;
                }
            }).asQueryableState("hakuna", new ValueStateDescriptor("any", addSource.getType(), Tuple2.of(0, 1337L)));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            cluster.submitJobDetached(jobGraph);
            Await.result(getKvStateWithRetries(queryableStateClient, jobID, asQueryableState.getQueryableStateName(), 0, KvStateRequestSerializer.serializeKeyAndNamespace(0, asQueryableState.getKeySerializer(), VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE), QUERY_RETRY_DELAY, true), fromNow.timeLeft());
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testValueStateShortcut() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            QueryableStateStream<Integer, Tuple2<Integer, Long>> asQueryableState = executionEnvironment.addSource(new TestAscendingValueSource(1024L)).keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.8
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("matata");
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            cluster.submitJobDetached(jobGraph);
            executeValueQuery(fromNow, queryableStateClient, jobID, asQueryableState, 1024L);
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testFoldingState() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            QueryableStateStream asQueryableState = executionEnvironment.addSource(new TestAscendingValueSource(1024L)).keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.9
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("pumba", new FoldingStateDescriptor("any", "0", new SumFold(), StringSerializer.INSTANCE));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            cluster.submitJobDetached(jobGraph);
            String num = Integer.toString(524800);
            for (int i = 0; i < NUM_SLOTS; i++) {
                byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(Integer.valueOf(i), asQueryableState.getKeySerializer(), VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
                boolean z = false;
                while (fromNow.hasTimeLeft() && !z) {
                    if (num.equals((String) KvStateRequestSerializer.deserializeValue((byte[]) Await.result(getKvStateWithRetries(queryableStateClient, jobID, asQueryableState.getQueryableStateName(), i, serializeKeyAndNamespace, QUERY_RETRY_DELAY, false), fromNow.timeLeft()), asQueryableState.getValueSerializer()))) {
                        z = true;
                    } else {
                        Thread.sleep(50L);
                    }
                }
                Assert.assertTrue("Did not succeed query", z);
            }
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testReducingState() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        QueryableStateClient queryableStateClient = new QueryableStateClient(cluster.configuration());
        JobID jobID = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.setParallelism(NUM_SLOTS);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
            DataStreamSource addSource = executionEnvironment.addSource(new TestAscendingValueSource(1024L));
            QueryableStateStream<Integer, Tuple2<Integer, Long>> asQueryableState = addSource.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.10
                public Integer getKey(Tuple2<Integer, Long> tuple2) throws Exception {
                    return (Integer) tuple2.f0;
                }
            }).asQueryableState("jungle", new ReducingStateDescriptor("any", new SumReduce(), addSource.getType()));
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            jobID = jobGraph.getJobID();
            cluster.submitJobDetached(jobGraph);
            executeValueQuery(fromNow, queryableStateClient, jobID, asQueryableState, 524800L);
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            if (jobID != null) {
                Await.ready(cluster.getLeaderGateway(fromNow.timeLeft()).ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)), fromNow.timeLeft());
            }
            queryableStateClient.shutDown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Future<byte[]> getKvStateWithRetries(final QueryableStateClient queryableStateClient, final JobID jobID, final String str, final int i, final byte[] bArr, final FiniteDuration finiteDuration, final boolean z) {
        return queryableStateClient.getKvState(jobID, str, i, bArr).recoverWith(new Recover<Future<byte[]>>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.11
            /* renamed from: recover, reason: merged with bridge method [inline-methods] */
            public Future<byte[]> m620recover(Throwable th) throws Throwable {
                return th instanceof AssertionError ? Futures.failed(th) : (z && (th instanceof UnknownKeyOrNamespace)) ? Futures.failed(th) : Patterns.after(finiteDuration, AbstractQueryableStateITCase.TEST_ACTOR_SYSTEM.scheduler(), AbstractQueryableStateITCase.TEST_ACTOR_SYSTEM.dispatcher(), new Callable<Future<byte[]>>() { // from class: org.apache.flink.test.query.AbstractQueryableStateITCase.11.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Future<byte[]> call() throws Exception {
                        return AbstractQueryableStateITCase.getKvStateWithRetries(queryableStateClient, jobID, str, i, bArr, finiteDuration, z);
                    }
                });
            }
        }, TEST_ACTOR_SYSTEM.dispatcher());
    }
}
