package org.apache.flink.test.checkpointing;

import io.netty.util.internal.ConcurrentSet;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase.class */
public class RescalingITCase extends TestLogger {
    private static final int numTaskManagers = 2;
    private static final int slotsPerTaskManager = 2;
    private static final int numSlots = 4;
    private static TestingCluster cluster;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.test.checkpointing.RescalingITCase$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod = new int[OperatorCheckpointMethod.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod[OperatorCheckpointMethod.CHECKPOINTED_FUNCTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod[OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod[OperatorCheckpointMethod.LIST_CHECKPOINTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod[OperatorCheckpointMethod.NON_PARTITIONED.ordinal()] = RescalingITCase.numSlots;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$CollectionSink.class */
    public static class CollectionSink<IN> implements SinkFunction<IN> {
        private static ConcurrentSet<Object> elements = new ConcurrentSet<>();
        private static final long serialVersionUID = -1652452958040267745L;

        private CollectionSink() {
        }

        public static <IN> Set<IN> getElementsSet() {
            return elements;
        }

        public static void clearElementsSet() {
            elements.clear();
        }

        public void invoke(IN in) throws Exception {
            elements.add(in);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$NonPartitionedStateSource.class */
    public static class NonPartitionedStateSource extends StateSourceBase implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = -8108185918123186841L;

        private NonPartitionedStateSource() {
            super();
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(Integer.valueOf(this.counter));
        }

        public void restoreState(List<Integer> list) throws Exception {
            if (list.isEmpty()) {
                return;
            }
            this.counter = list.get(0).intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$OperatorCheckpointMethod.class */
    public enum OperatorCheckpointMethod {
        NON_PARTITIONED,
        CHECKPOINTED_FUNCTION,
        CHECKPOINTED_FUNCTION_BROADCAST,
        LIST_CHECKPOINTED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$PartitionedStateSource.class */
    public static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> {
        private static final long serialVersionUID = -359715965103593462L;
        private static final int NUM_PARTITIONS = 7;
        private ListState<Integer> counterPartitions;
        private boolean broadcast;
        private static int[] CHECK_CORRECT_SNAPSHOT;
        private static int[] CHECK_CORRECT_RESTORE;

        public PartitionedStateSource(boolean z) {
            super();
            this.broadcast = z;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.counterPartitions.clear();
            CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            int i = this.counter / NUM_PARTITIONS;
            int i2 = this.counter % NUM_PARTITIONS;
            for (int i3 = 0; i3 < NUM_PARTITIONS; i3++) {
                int i4 = i;
                if (i2 > 0) {
                    i2--;
                    i4++;
                }
                this.counterPartitions.add(Integer.valueOf(i4));
            }
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            if (this.broadcast) {
                this.counterPartitions = functionInitializationContext.getOperatorStateStore().getBroadcastSerializableListState("counter_partitions");
            } else {
                this.counterPartitions = functionInitializationContext.getOperatorStateStore().getSerializableListState("counter_partitions");
            }
            if (functionInitializationContext.isRestored()) {
                Iterator it = ((Iterable) this.counterPartitions.get()).iterator();
                while (it.hasNext()) {
                    this.counter += ((Integer) it.next()).intValue();
                }
                CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            }
        }

        public void restoreState(Integer num) throws Exception {
            this.counterPartitions.add(num);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$PartitionedStateSourceListCheckpointed.class */
    public static class PartitionedStateSourceListCheckpointed extends StateSourceBase implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = -4357864582992546L;
        private static final int NUM_PARTITIONS = 7;
        private static int[] CHECK_CORRECT_SNAPSHOT;
        private static int[] CHECK_CORRECT_RESTORE;

        private PartitionedStateSourceListCheckpointed() {
            super();
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            int i = this.counter / NUM_PARTITIONS;
            int i2 = this.counter % NUM_PARTITIONS;
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < NUM_PARTITIONS; i3++) {
                int i4 = i;
                if (i2 > 0) {
                    i2--;
                    i4++;
                }
                arrayList.add(Integer.valueOf(i4));
            }
            return arrayList;
        }

        public void restoreState(List<Integer> list) throws Exception {
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                this.counter += it.next().intValue();
            }
            CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$StateSourceBase.class */
    public static class StateSourceBase extends RichParallelSourceFunction<Integer> {
        private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1);
        protected volatile int counter;
        protected volatile boolean running;

        private StateSourceBase() {
            this.counter = 0;
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            while (this.running) {
                synchronized (checkpointLock) {
                    this.counter++;
                    sourceContext.collect(1);
                }
                Thread.sleep(2L);
                if (this.counter == 10) {
                    workStartedLatch.countDown();
                }
                if (this.counter >= 500) {
                    return;
                }
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$SubtaskIndexFlatMapper.class */
    public static class SubtaskIndexFlatMapper extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>> implements CheckpointedFunction {
        private static final long serialVersionUID = 5273172591283191348L;
        private static volatile CountDownLatch workCompletedLatch = new CountDownLatch(1);
        private transient ValueState<Integer> counter;
        private transient ValueState<Integer> sum;
        private final int numberElements;

        SubtaskIndexFlatMapper(int i) {
            this.numberElements = i;
        }

        public void flatMap(Integer num, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            int intValue = ((Integer) this.counter.value()).intValue() + 1;
            this.counter.update(Integer.valueOf(intValue));
            int intValue2 = ((Integer) this.sum.value()).intValue() + num.intValue();
            this.sum.update(Integer.valueOf(intValue2));
            if (intValue % this.numberElements == 0) {
                collector.collect(Tuple2.of(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(intValue2)));
                workCompletedLatch.countDown();
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.counter = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("counter", Integer.class, 0));
            this.sum = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("sum", Integer.class, 0));
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Integer) obj, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$SubtaskIndexNonPartitionedStateSource.class */
    public static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 8388073059042040203L;

        SubtaskIndexNonPartitionedStateSource(int i, int i2, boolean z) {
            super(i, i2, z);
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(Integer.valueOf(this.counter));
        }

        public void restoreState(List<Integer> list) throws Exception {
            if (list.isEmpty() || list.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
            }
            this.counter = list.get(0).intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RescalingITCase$SubtaskIndexSource.class */
    public static class SubtaskIndexSource extends RichParallelSourceFunction<Integer> {
        private static final long serialVersionUID = -400066323594122516L;
        private final int numberKeys;
        private final int numberElements;
        private final boolean terminateAfterEmission;
        protected int counter = 0;
        private boolean running = true;

        SubtaskIndexSource(int i, int i2, boolean z) {
            this.numberKeys = i;
            this.numberElements = i2;
            this.terminateAfterEmission = z;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            while (this.running) {
                if (this.counter < this.numberElements) {
                    synchronized (checkpointLock) {
                        int i = indexOfThisSubtask;
                        while (i < this.numberKeys) {
                            sourceContext.collect(Integer.valueOf(i));
                            i += getRuntimeContext().getNumberOfParallelSubtasks();
                        }
                        this.counter++;
                    }
                } else if (this.terminateAfterEmission) {
                    this.running = false;
                } else {
                    Thread.sleep(100L);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 2);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
        File newFolder = temporaryFolder.newFolder();
        File newFolder2 = temporaryFolder.newFolder();
        configuration.setString("state.backend", "filesystem");
        configuration.setString("state.backend.fs.checkpointdir", newFolder.toURI().toString());
        configuration.setString("state.savepoints.dir", newFolder2.toURI().toString());
        cluster = new TestingCluster(configuration);
        cluster.start();
    }

    @AfterClass
    public static void teardown() {
        if (cluster != null) {
            cluster.shutdown();
            cluster.awaitTermination();
        }
    }

    @Test
    public void testSavepointRescalingInKeyedState() throws Exception {
        testSavepointRescalingKeyedState(false, false);
    }

    @Test
    public void testSavepointRescalingOutKeyedState() throws Exception {
        testSavepointRescalingKeyedState(true, false);
    }

    @Test
    public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
        testSavepointRescalingKeyedState(false, true);
    }

    @Test
    public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
        testSavepointRescalingKeyedState(true, true);
    }

    public void testSavepointRescalingKeyedState(boolean z, boolean z2) throws Exception {
        int i = z ? 2 : numSlots;
        int i2 = z ? numSlots : 2;
        FiniteDuration finiteDuration = new FiniteDuration(3L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        ActorGateway actorGateway = null;
        JobID jobID = null;
        try {
            actorGateway = cluster.getLeaderGateway(fromNow.timeLeft());
            JobGraph createJobGraphWithKeyedState = createJobGraphWithKeyedState(i, 13, 42, 1000, false, 100);
            JobID jobID2 = createJobGraphWithKeyedState.getJobID();
            cluster.submitJobDetached(createJobGraphWithKeyedState);
            SubtaskIndexFlatMapper.workCompletedLatch.await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Set elementsSet = CollectionSink.getElementsSet();
            HashSet hashSet = new HashSet();
            for (int i3 = 0; i3 < 42; i3++) {
                hashSet.add(Tuple2.of(Integer.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(13, i, KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i3), 13))), Integer.valueOf(1000 * i3)));
            }
            Assert.assertEquals(hashSet, elementsSet);
            CollectionSink.clearElementsSet();
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID2, Option.empty()), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID2), fromNow.timeLeft());
            Assert.assertTrue(Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID2), fromNow.timeLeft()), fromNow.timeLeft()) instanceof JobManagerMessages.CancellationSuccess);
            Await.ready(ask, fromNow.timeLeft());
            JobGraph createJobGraphWithKeyedState2 = createJobGraphWithKeyedState(i2, z2 ? -1 : 13, 42, 500, true, 100);
            createJobGraphWithKeyedState2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
            createJobGraphWithKeyedState2.getJobID();
            cluster.submitJobAndWait(createJobGraphWithKeyedState2, false);
            jobID = null;
            Set elementsSet2 = CollectionSink.getElementsSet();
            HashSet hashSet2 = new HashSet();
            for (int i4 = 0; i4 < 42; i4++) {
                hashSet2.add(Tuple2.of(Integer.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(13, i2, KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i4), 13))), Integer.valueOf(i4 * 1500)));
            }
            Assert.assertEquals(hashSet2, elementsSet2);
            CollectionSink.clearElementsSet();
            if (0 == 0 || actorGateway == null) {
                return;
            }
            try {
                Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved((JobID) null), finiteDuration), finiteDuration);
            } catch (InterruptedException | TimeoutException e) {
                Assert.fail("Failed while cleaning up the cluster.");
            }
        } catch (Throwable th) {
            CollectionSink.clearElementsSet();
            if (jobID != null && actorGateway != null) {
                try {
                    Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), finiteDuration), finiteDuration);
                } catch (InterruptedException | TimeoutException e2) {
                    Assert.fail("Failed while cleaning up the cluster.");
                }
            }
            throw th;
        }
    }

    @Test
    public void testSavepointRescalingNonPartitionedStateCausesException() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(3L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        JobID jobID = null;
        ActorGateway actorGateway = null;
        try {
            try {
                actorGateway = cluster.getLeaderGateway(fromNow.timeLeft());
                JobGraph createJobGraphWithOperatorState = createJobGraphWithOperatorState(2, 13, OperatorCheckpointMethod.NON_PARTITIONED);
                JobID jobID2 = createJobGraphWithOperatorState.getJobID();
                cluster.submitJobDetached(createJobGraphWithOperatorState);
                StateSourceBase.workStartedLatch.await();
                Object result = Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID2, Option.empty()), fromNow.timeLeft()), new FiniteDuration(10L, TimeUnit.SECONDS));
                Assert.assertTrue(String.valueOf(result), result instanceof JobManagerMessages.TriggerSavepointSuccess);
                String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) result).savepointPath();
                Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID2), fromNow.timeLeft());
                Assert.assertTrue(Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID2), fromNow.timeLeft()), fromNow.timeLeft()) instanceof JobManagerMessages.CancellationSuccess);
                Await.ready(ask, fromNow.timeLeft());
                JobGraph createJobGraphWithOperatorState2 = createJobGraphWithOperatorState(numSlots, 13, OperatorCheckpointMethod.NON_PARTITIONED);
                createJobGraphWithOperatorState2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
                createJobGraphWithOperatorState2.getJobID();
                cluster.submitJobAndWait(createJobGraphWithOperatorState2, false);
                jobID = null;
                if (0 == 0 || actorGateway == null) {
                    return;
                }
                try {
                    Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved((JobID) null), finiteDuration), finiteDuration);
                } catch (InterruptedException | TimeoutException e) {
                    Assert.fail("Failed while cleaning up the cluster.");
                }
            } catch (Throwable th) {
                if (jobID != null && actorGateway != null) {
                    try {
                        Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), finiteDuration), finiteDuration);
                    } catch (InterruptedException | TimeoutException e2) {
                        Assert.fail("Failed while cleaning up the cluster.");
                    }
                }
                throw th;
            }
        } catch (JobExecutionException e3) {
            if (!(e3.getCause() instanceof IllegalStateException)) {
                throw e3;
            }
            if (jobID == null || actorGateway == null) {
                return;
            }
            try {
                Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), finiteDuration), finiteDuration);
            } catch (InterruptedException | TimeoutException e4) {
                Assert.fail("Failed while cleaning up the cluster.");
            }
        }
    }

    @Test
    public void testSavepointRescalingWithKeyedAndNonPartitionedState() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(3L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        ActorGateway actorGateway = null;
        JobID jobID = null;
        try {
            actorGateway = cluster.getLeaderGateway(fromNow.timeLeft());
            JobGraph createJobGraphWithKeyedAndNonPartitionedOperatorState = createJobGraphWithKeyedAndNonPartitionedOperatorState(2, 13, 2, 42, 1000, false, 100);
            JobID jobID2 = createJobGraphWithKeyedAndNonPartitionedOperatorState.getJobID();
            cluster.submitJobDetached(createJobGraphWithKeyedAndNonPartitionedOperatorState);
            SubtaskIndexFlatMapper.workCompletedLatch.await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Set elementsSet = CollectionSink.getElementsSet();
            HashSet hashSet = new HashSet();
            for (int i = 0; i < 42; i++) {
                hashSet.add(Tuple2.of(Integer.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(13, 2, KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i), 13))), Integer.valueOf(1000 * i)));
            }
            Assert.assertEquals(hashSet, elementsSet);
            CollectionSink.clearElementsSet();
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID2, Option.empty()), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID2), fromNow.timeLeft());
            Assert.assertTrue(Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID2), fromNow.timeLeft()), fromNow.timeLeft()) instanceof JobManagerMessages.CancellationSuccess);
            Await.ready(ask, fromNow.timeLeft());
            JobGraph createJobGraphWithKeyedAndNonPartitionedOperatorState2 = createJobGraphWithKeyedAndNonPartitionedOperatorState(numSlots, 13, 2, 42, 1000 + 500, true, 100);
            createJobGraphWithKeyedAndNonPartitionedOperatorState2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
            createJobGraphWithKeyedAndNonPartitionedOperatorState2.getJobID();
            cluster.submitJobAndWait(createJobGraphWithKeyedAndNonPartitionedOperatorState2, false);
            jobID = null;
            Set elementsSet2 = CollectionSink.getElementsSet();
            HashSet hashSet2 = new HashSet();
            for (int i2 = 0; i2 < 42; i2++) {
                hashSet2.add(Tuple2.of(Integer.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(13, numSlots, KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i2), 13))), Integer.valueOf(i2 * (1000 + 500))));
            }
            Assert.assertEquals(hashSet2, elementsSet2);
            CollectionSink.clearElementsSet();
            if (0 == 0 || actorGateway == null) {
                return;
            }
            try {
                Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved((JobID) null), finiteDuration), finiteDuration);
            } catch (InterruptedException | TimeoutException e) {
                Assert.fail("Failed while cleaning up the cluster.");
            }
        } catch (Throwable th) {
            CollectionSink.clearElementsSet();
            if (jobID != null && actorGateway != null) {
                try {
                    Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), finiteDuration), finiteDuration);
                } catch (InterruptedException | TimeoutException e2) {
                    Assert.fail("Failed while cleaning up the cluster.");
                }
            }
            throw th;
        }
    }

    @Test
    public void testSavepointRescalingInPartitionedOperatorState() throws Exception {
        testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION);
    }

    @Test
    public void testSavepointRescalingOutPartitionedOperatorState() throws Exception {
        testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION);
    }

    @Test
    public void testSavepointRescalingInBroadcastOperatorState() throws Exception {
        testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST);
    }

    @Test
    public void testSavepointRescalingOutBroadcastOperatorState() throws Exception {
        testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST);
    }

    @Test
    public void testSavepointRescalingInPartitionedOperatorStateList() throws Exception {
        testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.LIST_CHECKPOINTED);
    }

    @Test
    public void testSavepointRescalingOutPartitionedOperatorStateList() throws Exception {
        testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.LIST_CHECKPOINTED);
    }

    public void testSavepointRescalingPartitionedOperatorState(boolean z, OperatorCheckpointMethod operatorCheckpointMethod) throws Exception {
        int i = z ? numSlots : 2;
        int i2 = z ? 2 : numSlots;
        FiniteDuration finiteDuration = new FiniteDuration(3L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        JobID jobID = null;
        ActorGateway actorGateway = null;
        int max = Math.max(i, i2);
        if (operatorCheckpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION || operatorCheckpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
            int[] unused = PartitionedStateSource.CHECK_CORRECT_SNAPSHOT = new int[max];
            int[] unused2 = PartitionedStateSource.CHECK_CORRECT_RESTORE = new int[max];
        } else {
            int[] unused3 = PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT = new int[max];
            int[] unused4 = PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE = new int[max];
        }
        try {
            actorGateway = cluster.getLeaderGateway(fromNow.timeLeft());
            JobGraph createJobGraphWithOperatorState = createJobGraphWithOperatorState(i, 13, operatorCheckpointMethod);
            JobID jobID2 = createJobGraphWithOperatorState.getJobID();
            cluster.submitJobDetached(createJobGraphWithOperatorState);
            Object obj = null;
            StateSourceBase.workStartedLatch.await();
            while (fromNow.hasTimeLeft()) {
                obj = Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID2, Option.empty()), fromNow.timeLeft()), new FiniteDuration(10L, TimeUnit.SECONDS));
                if (obj instanceof JobManagerMessages.TriggerSavepointSuccess) {
                    break;
                } else {
                    System.out.println(obj);
                }
            }
            Assert.assertTrue(obj instanceof JobManagerMessages.TriggerSavepointSuccess);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) obj).savepointPath();
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID2), fromNow.timeLeft());
            Assert.assertTrue(Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID2), fromNow.timeLeft()), fromNow.timeLeft()) instanceof JobManagerMessages.CancellationSuccess);
            Await.ready(ask, fromNow.timeLeft());
            JobGraph createJobGraphWithOperatorState2 = createJobGraphWithOperatorState(i2, 13, operatorCheckpointMethod);
            createJobGraphWithOperatorState2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
            createJobGraphWithOperatorState2.getJobID();
            cluster.submitJobAndWait(createJobGraphWithOperatorState2, false);
            int i3 = 0;
            int i4 = 0;
            if (operatorCheckpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION) {
                for (int i5 : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
                    i3 += i5;
                }
                for (int i6 : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
                    i4 += i6;
                }
            } else if (operatorCheckpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
                for (int i7 : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
                    i3 += i7;
                }
                for (int i8 : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
                    i4 += i8;
                }
                i3 *= i2;
            } else {
                for (int i9 : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT) {
                    i3 += i9;
                }
                for (int i10 : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE) {
                    i4 += i10;
                }
            }
            Assert.assertEquals(i3, i4);
            jobID = null;
            if (0 == 0 || actorGateway == null) {
                return;
            }
            try {
                Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved((JobID) null), finiteDuration), finiteDuration);
            } catch (InterruptedException | TimeoutException e) {
                Assert.fail("Failed while cleaning up the cluster.");
            }
        } catch (Throwable th) {
            if (jobID != null && actorGateway != null) {
                try {
                    Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), finiteDuration), finiteDuration);
                } catch (InterruptedException | TimeoutException e2) {
                    Assert.fail("Failed while cleaning up the cluster.");
                }
            }
            throw th;
        }
    }

    private static JobGraph createJobGraphWithOperatorState(int i, int i2, OperatorCheckpointMethod operatorCheckpointMethod) {
        SourceFunction nonPartitionedStateSource;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.getConfig().setMaxParallelism(i2);
        executionEnvironment.enableCheckpointing(Long.MAX_VALUE);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        CountDownLatch unused = StateSourceBase.workStartedLatch = new CountDownLatch(i);
        switch (AnonymousClass3.$SwitchMap$org$apache$flink$test$checkpointing$RescalingITCase$OperatorCheckpointMethod[operatorCheckpointMethod.ordinal()]) {
            case 1:
                nonPartitionedStateSource = new PartitionedStateSource(false);
                break;
            case 2:
                nonPartitionedStateSource = new PartitionedStateSource(true);
                break;
            case 3:
                nonPartitionedStateSource = new PartitionedStateSourceListCheckpointed();
                break;
            case numSlots /* 4 */:
                nonPartitionedStateSource = new NonPartitionedStateSource();
                break;
            default:
                throw new IllegalArgumentException();
        }
        executionEnvironment.addSource(nonPartitionedStateSource).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static JobGraph createJobGraphWithKeyedState(int i, int i2, int i3, int i4, boolean z, int i5) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        if (0 < i2) {
            executionEnvironment.getConfig().setMaxParallelism(i2);
        }
        executionEnvironment.enableCheckpointing(i5);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        KeyedStream keyBy = executionEnvironment.addSource(new SubtaskIndexSource(i3, i4, z)).keyBy(new KeySelector<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.RescalingITCase.1
            private static final long serialVersionUID = -7952298871120320940L;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        });
        CountDownLatch unused = SubtaskIndexFlatMapper.workCompletedLatch = new CountDownLatch(i3);
        keyBy.flatMap(new SubtaskIndexFlatMapper(i4)).addSink(new CollectionSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static JobGraph createJobGraphWithKeyedAndNonPartitionedOperatorState(int i, int i2, int i3, int i4, int i5, boolean z, int i6) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.getConfig().setMaxParallelism(i2);
        executionEnvironment.enableCheckpointing(i6);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        KeyedStream keyBy = executionEnvironment.addSource(new SubtaskIndexNonPartitionedStateSource(i4, i5, z)).setParallelism(i3).keyBy(new KeySelector<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.RescalingITCase.2
            private static final long serialVersionUID = -7952298871120320940L;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        });
        CountDownLatch unused = SubtaskIndexFlatMapper.workCompletedLatch = new CountDownLatch(i4);
        keyBy.flatMap(new SubtaskIndexFlatMapper(i5)).addSink(new CollectionSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }
}
