package org.apache.flink.test.checkpointing;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.class */
public class RestoreUpgradedJobITCase extends TestLogger {
    private static final int TOTAL_RECORDS = 100;

    @Parameterized.Parameter
    public TestCheckpointType checkpointType;

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private SharedReference<OneShotLatch> allDataEmittedLatch;
    private SharedReference<AtomicLong> result;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(new Configuration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(PARALLELISM).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$AbstractMap.class */
    public static abstract class AbstractMap<T> extends RichMapFunction<T, T> implements CheckpointedFunction {
        protected ListState<Integer> valueState;
        protected final int id;

        private AbstractMap(int i) {
            this.id = i;
        }

        protected int calculate(int i) throws Exception {
            return i;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.valueState.add(Integer.valueOf(this.id));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.valueState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", Types.INT));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$IntMap.class */
    public static class IntMap extends AbstractMap<Integer> {
        private IntMap(int i) {
            super(i);
        }

        public Integer map(Integer num) throws Exception {
            return Integer.valueOf(calculate(num.intValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$IntSink.class */
    public static class IntSink implements SinkFunction<Integer> {
        private final SharedReference<AtomicLong> result;

        public IntSink(SharedReference<AtomicLong> sharedReference) {
            this.result = sharedReference;
        }

        public void invoke(Integer num, SinkFunction.Context context) throws Exception {
            ((AtomicLong) this.result.get()).addAndGet(num.intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$IntSource.class */
    public static class IntSource extends TestSource<Integer> {
        public IntSource(SharedReference<OneShotLatch> sharedReference) {
            super(sharedReference);
        }

        @Override // org.apache.flink.test.checkpointing.RestoreUpgradedJobITCase.TestSource
        void collect(SourceFunction.SourceContext<Integer> sourceContext, int i) {
            sourceContext.collect(Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$MapName.class */
    public enum MapName {
        MAP_1,
        MAP_2,
        MAP_3,
        MAP_4,
        MAP_5,
        MAP_6;

        int id() {
            return ordinal() + 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$StringMap.class */
    public static class StringMap extends AbstractMap<String> {
        private StringMap(int i) {
            super(i);
        }

        public String map(String str) throws Exception {
            return String.valueOf(calculate(Integer.parseInt(str)));
        }

        @Override // org.apache.flink.test.checkpointing.RestoreUpgradedJobITCase.AbstractMap
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            super.initializeState(functionInitializationContext);
            Iterator it = ((Iterable) this.valueState.get()).iterator();
            if (this.id > 0) {
                Preconditions.checkState(it.hasNext(), "Value state can not be empty.");
                Integer num = (Integer) it.next();
                Preconditions.checkState(this.id == num.intValue(), String.format("Value state(%s) should be equal to id(%s).", num, Integer.valueOf(this.id)));
            }
            Preconditions.checkState(!it.hasNext(), "Value state should be empty.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$StringSink.class */
    public static class StringSink implements SinkFunction<String> {
        private final SharedReference<AtomicLong> result;

        public StringSink(SharedReference<AtomicLong> sharedReference) {
            this.result = sharedReference;
        }

        public void invoke(String str, SinkFunction.Context context) throws Exception {
            ((AtomicLong) this.result.get()).addAndGet(Integer.parseInt(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$StringSource.class */
    public static class StringSource extends TestSource<String> {
        public StringSource(SharedReference<OneShotLatch> sharedReference) {
            super(sharedReference);
        }

        @Override // org.apache.flink.test.checkpointing.RestoreUpgradedJobITCase.TestSource
        void collect(SourceFunction.SourceContext<String> sourceContext, int i) {
            sourceContext.collect(String.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$TestCheckpointType.class */
    public enum TestCheckpointType {
        ALIGNED_CHECKPOINT,
        CANONICAL_SAVEPOINT,
        NATIVE_SAVEPOINT
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase$TestSource.class */
    private static abstract class TestSource<T> implements SourceFunction<T> {
        private static final long serialVersionUID = 1;
        private final SharedReference<OneShotLatch> dataEmitted;
        private volatile boolean isRunning = true;

        public TestSource(SharedReference<OneShotLatch> sharedReference) {
            this.dataEmitted = sharedReference;
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
            int i = 100;
            while (true) {
                int i2 = i;
                i--;
                if (i2 <= 0) {
                    break;
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    collect(sourceContext, i);
                }
            }
            ((OneShotLatch) this.dataEmitted.get()).trigger();
            while (this.isRunning) {
                LockSupport.parkNanos(100000L);
            }
        }

        abstract void collect(SourceFunction.SourceContext<T> sourceContext, int i);

        public void cancel() {
            this.isRunning = false;
        }
    }

    public void setupSharedObjects() {
        this.allDataEmittedLatch = this.sharedObjects.add(new OneShotLatch());
        this.result = this.sharedObjects.add(new AtomicLong());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "Savepoint type[{0}]")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{TestCheckpointType.ALIGNED_CHECKPOINT}, new Object[]{TestCheckpointType.CANONICAL_SAVEPOINT}, new Object[]{TestCheckpointType.NATIVE_SAVEPOINT}};
    }

    @Test
    public void testRestoreUpgradedJob() throws Exception {
        setupSharedObjects();
        String runOriginalJob = runOriginalJob();
        MatcherAssert.assertThat(Long.valueOf(((AtomicLong) this.result.get()).longValue()), Matchers.is(Long.valueOf(calculateExpectedResultBeforeSavepoint())));
        ((AtomicLong) this.result.get()).set(0L);
        runUpgradedJob(runOriginalJob);
        MatcherAssert.assertThat(Long.valueOf(((AtomicLong) this.result.get()).longValue()), Matchers.is(Long.valueOf(calculateExpectedResultBeforeSavepoint())));
    }

    private long calculateExpectedResultAfterSavepoint() {
        long j = 0;
        for (int i = 1; i <= MapName.values().length; i++) {
            j += i * i;
        }
        long j2 = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            j2 += i2 + j;
        }
        return 4 * j2;
    }

    private long calculateExpectedResultBeforeSavepoint() {
        long j = 0;
        for (int i = 0; i < 100; i++) {
            j += i;
        }
        return 4 * j;
    }

    @NotNull
    private String runOriginalJob() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(false);
        executionEnvironment.getCheckpointConfig().setCheckpointStorage("file://" + temporaryFolder.getRoot().getAbsolutePath());
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(2147483647L);
        executionEnvironment.addSource(new IntSource(this.allDataEmittedLatch)).map(new IntMap(MapName.MAP_5.id())).uid(MapName.MAP_5.name()).forward().map(new IntMap(MapName.MAP_1.id())).uid(MapName.MAP_1.name()).slotSharingGroup("anotherSharingGroup").keyBy(num -> {
            return num;
        }).map(new IntMap(MapName.MAP_6.id())).uid(MapName.MAP_6.name()).rebalance().map(new IntMap(MapName.MAP_4.id())).uid(MapName.MAP_4.name()).broadcast().map(new IntMap(MapName.MAP_2.id())).uid(MapName.MAP_2.name()).rescale().map(new IntMap(MapName.MAP_3.id())).uid(MapName.MAP_3.name()).addSink(new IntSink(this.result)).setParallelism(1);
        JobClient executeAsync = executionEnvironment.executeAsync("Total sum");
        CommonTestUtils.waitForAllTaskRunning(CLUSTER.getMiniCluster(), executeAsync.getJobID(), false);
        ((OneShotLatch) this.allDataEmittedLatch.get()).await();
        ((OneShotLatch) this.allDataEmittedLatch.get()).reset();
        return stopWithSnapshot(executeAsync);
    }

    private void runUpgradedJob(String str) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, str);
        configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.addSource(new StringSource(this.allDataEmittedLatch)).map(new StringMap(MapName.MAP_1.id())).uid(MapName.MAP_1.name()).forward().map(new StringMap(MapName.MAP_2.id())).uid(MapName.MAP_2.name()).slotSharingGroup("anotherSharingGroup").keyBy(str2 -> {
            return str2;
        }).map(new StringMap(MapName.MAP_3.id())).uid(MapName.MAP_3.name()).map(new StringMap(-1)).uid("new_chained_map").rebalance().map(new StringMap(-2)).uid("new_map2").map(new StringMap(MapName.MAP_4.id())).uid(MapName.MAP_4.name()).rescale().map(new StringMap(MapName.MAP_5.id())).uid(MapName.MAP_5.name()).broadcast().map(new StringMap(MapName.MAP_6.id())).uid(MapName.MAP_6.name()).addSink(new StringSink(this.result)).setParallelism(1);
        JobClient executeAsync = executionEnvironment.executeAsync("Total sum");
        CommonTestUtils.waitForAllTaskRunning(CLUSTER.getMiniCluster(), executeAsync.getJobID(), false);
        ((OneShotLatch) this.allDataEmittedLatch.get()).await();
        executeAsync.stopWithSavepoint(true, temporaryFolder.getRoot().getAbsolutePath(), SavepointFormatType.CANONICAL).get();
    }

    private String stopWithSnapshot(JobClient jobClient) throws InterruptedException, ExecutionException {
        String str;
        if (this.checkpointType == TestCheckpointType.ALIGNED_CHECKPOINT) {
            str = (String) CLUSTER.getMiniCluster().triggerCheckpoint(jobClient.getJobID()).get();
            jobClient.cancel().get();
        } else if (this.checkpointType == TestCheckpointType.CANONICAL_SAVEPOINT) {
            str = (String) jobClient.stopWithSavepoint(true, temporaryFolder.getRoot().getAbsolutePath(), SavepointFormatType.CANONICAL).get();
        } else {
            if (this.checkpointType != TestCheckpointType.NATIVE_SAVEPOINT) {
                throw new IllegalArgumentException("Unknown checkpoint type: " + this.checkpointType);
            }
            str = (String) jobClient.stopWithSavepoint(true, temporaryFolder.getRoot().getAbsolutePath(), SavepointFormatType.NATIVE).get();
        }
        return str;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 730336957:
                if (implMethodName.equals("lambda$runUpgradedJob$909ff544$1")) {
                    z = true;
                    break;
                }
                break;
            case 1470403921:
                if (implMethodName.equals("lambda$runOriginalJob$30dad6f8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str2 -> {
                        return str2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
