package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.class */
public abstract class ChangelogRecoveryITCaseBase extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 1;
    private static final int NUM_TASK_SLOTS = 4;
    protected static final int NUM_SLOTS = 4;
    protected static final int TOTAL_ELEMENTS = 10000;
    protected final AbstractStateBackend delegatedStateBackend;
    protected MiniClusterWithClientResource cluster;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase$ArtificialFailure.class */
    protected static class ArtificialFailure extends Exception {
        protected ArtificialFailure() {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase$CollectionSink.class */
    protected static class CollectionSink implements SinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private static final Map<Integer, Integer> expectedResult = new HashMap();

        /* JADX WARN: Multi-variable type inference failed */
        public void invoke(Tuple2<Integer, Integer> tuple2, SinkFunction.Context context) throws Exception {
            expectedResult.merge(tuple2.f0, tuple2.f1, (v0, v1) -> {
                return Math.max(v0, v1);
            });
        }

        public static void clearExpectedResult() {
            expectedResult.clear();
        }

        public static Map<Integer, Integer> getActualResult() {
            return expectedResult;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase$ControlledSource.class */
    protected static class ControlledSource extends RichSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener {
        private static final long serialVersionUID = 1;
        protected volatile int currentIndex;
        protected final AtomicInteger completedCheckpointNum = new AtomicInteger();
        protected volatile boolean isCanceled;
        private static final List<Integer> sourceList = Collections.unmodifiableList(initSourceData(ChangelogRecoveryITCaseBase.TOTAL_ELEMENTS));
        private transient ListState<Integer> currentIndexState;
        private transient ListState<Integer> completedCheckpointNumState;

        public static List<Integer> initSourceData(int i) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2 += ChangelogRecoveryITCaseBase.NUM_TASK_MANAGERS) {
                arrayList.add(Integer.valueOf(ThreadLocalRandom.current().nextInt(i)));
            }
            return arrayList;
        }

        public static Map<Integer, Integer> getExpectedResult() {
            return (Map) sourceList.stream().collect(Collectors.toConcurrentMap(num -> {
                return Integer.valueOf(num.intValue() % 100);
            }, num2 -> {
                return Integer.valueOf(ChangelogRecoveryITCaseBase.NUM_TASK_MANAGERS);
            }, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }));
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.currentIndexState.update(Collections.singletonList(Integer.valueOf(this.currentIndex)));
            this.completedCheckpointNumState.update(Collections.singletonList(Integer.valueOf(this.completedCheckpointNum.get())));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.currentIndexState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("currentIndexState", Integer.class));
            this.completedCheckpointNumState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("completedCheckpointNumState", Integer.class));
            if (functionInitializationContext.isRestored()) {
                this.currentIndex = ((Integer) Iterables.get((Iterable) this.currentIndexState.get(), 0)).intValue();
                this.completedCheckpointNum.compareAndSet(0, ((Integer) Iterables.get((Iterable) this.completedCheckpointNumState.get(), 0)).intValue());
            }
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (!this.isCanceled && this.currentIndex < sourceList.size()) {
                beforeElement(sourceContext);
                synchronized (sourceContext.getCheckpointLock()) {
                    if (!this.isCanceled && this.currentIndex < sourceList.size()) {
                        List<Integer> list = sourceList;
                        int i = this.currentIndex;
                        this.currentIndex = i + ChangelogRecoveryITCaseBase.NUM_TASK_MANAGERS;
                        sourceContext.collect(Integer.valueOf(list.get(i).intValue() % 100));
                    }
                }
            }
        }

        protected void beforeElement(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void waitWhile(SerializableBooleanSupplierWithException serializableBooleanSupplierWithException) throws Exception {
            while (serializableBooleanSupplierWithException.getAsBoolean()) {
                Thread.sleep(10L);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void throwArtificialFailure() throws Exception {
            throw new ArtificialFailure();
        }

        public void cancel() {
            this.isCanceled = true;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            this.completedCheckpointNum.getAndIncrement();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase$CountFunction.class */
    protected static class CountFunction extends KeyedProcessFunction<Integer, Integer, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private ValueState<Integer> countState;

        public void open(Configuration configuration) throws Exception {
            this.countState = getRuntimeContext().getState(new ValueStateDescriptor("countState", Integer.class));
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Tuple2<Integer, Integer>>.Context context, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            Integer num2 = (Integer) this.countState.value();
            Integer valueOf = Integer.valueOf(num2 == null ? ChangelogRecoveryITCaseBase.NUM_TASK_MANAGERS : num2.intValue() + ChangelogRecoveryITCaseBase.NUM_TASK_MANAGERS);
            this.countState.update(valueOf);
            collector.collect(Tuple2.of(num, valueOf));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Tuple2<Integer, Integer>>.Context) context, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase$SerializableBooleanSupplierWithException.class */
    protected interface SerializableBooleanSupplierWithException extends Serializable {
        boolean getAsBoolean() throws Exception;
    }

    @Parameterized.Parameters(name = "delegated state backend type = {0}")
    public static Collection<AbstractStateBackend> parameter() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend(true), new EmbeddedRocksDBStateBackend(false));
    }

    public ChangelogRecoveryITCaseBase(AbstractStateBackend abstractStateBackend) {
        this.delegatedStateBackend = abstractStateBackend;
    }

    @Before
    public void setup() throws Exception {
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configure()).setNumberTaskManagers(NUM_TASK_MANAGERS).setNumberSlotsPerTaskManager(4).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @After
    public void tearDown() throws IOException {
        this.cluster.after();
        CollectionSink.clearExpectedResult();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamExecutionEnvironment getEnv(StateBackend stateBackend, long j, int i, long j2, int i2) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(j).enableChangelogStateBackend(true);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(false);
        executionEnvironment.setStateBackend(stateBackend).setRestartStrategy(RestartStrategies.fixedDelayRestart(i, 0L));
        executionEnvironment.configure(new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofMillis(j2)).set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, Integer.valueOf(i2)));
        return executionEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamExecutionEnvironment getEnv(StateBackend stateBackend, File file, long j, int i, long j2, int i2) {
        StreamExecutionEnvironment env = getEnv(stateBackend, j, i, j2, i2);
        env.getCheckpointConfig().setCheckpointStorage(file.toURI());
        return env;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobGraph buildJobGraph(StreamExecutionEnvironment streamExecutionEnvironment, ControlledSource controlledSource, JobID jobID) {
        KeyedStream keyBy = streamExecutionEnvironment.addSource(controlledSource).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).keyBy(num -> {
            return num;
        });
        keyBy.process(new CountFunction()).addSink(new CollectionSink()).setParallelism(NUM_TASK_MANAGERS);
        keyBy.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10L))).process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase.1
            public void process(Integer num2, ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<Integer> collector) {
            }

            public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
                process((Integer) obj, (ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context) context, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
            }
        }).addSink(new DiscardingSink());
        return streamExecutionEnvironment.getStreamGraph().getJobGraph(streamExecutionEnvironment.getClass().getClassLoader(), jobID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitAndAssert(JobGraph jobGraph) throws Exception {
        waitUntilJobFinished(jobGraph);
        Assert.assertEquals(CollectionSink.getActualResult(), ControlledSource.getExpectedResult());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobID generateJobID() {
        byte[] bArr = new byte[16];
        ThreadLocalRandom.current().nextBytes(bArr);
        return JobID.fromByteArray(bArr);
    }

    public static Set<StateHandleID> getAllStateHandleId(JobID jobID, MiniCluster miniCluster) throws IOException, FlinkJobNotFoundException, ExecutionException, InterruptedException {
        Optional latestCompletedCheckpointPath = CommonTestUtils.getLatestCompletedCheckpointPath(jobID, miniCluster);
        if (!latestCompletedCheckpointPath.isPresent()) {
            return Collections.emptySet();
        }
        try {
            Set<StateHandleID> set = (Set) TestUtils.loadCheckpointMetadata((String) latestCompletedCheckpointPath.get()).getOperatorStates().stream().flatMap(operatorState -> {
                return operatorState.getStates().stream();
            }).flatMap(operatorSubtaskState -> {
                return operatorSubtaskState.getManagedKeyedState().stream();
            }).flatMap(keyedStateHandle -> {
                return keyedStateHandle instanceof ChangelogStateBackendHandle ? ((ChangelogStateBackendHandle) keyedStateHandle).getMaterializedStateHandles().stream() : Stream.of(keyedStateHandle);
            }).map((v0) -> {
                return v0.getStateHandleId();
            }).collect(Collectors.toSet());
            return !set.isEmpty() ? set : Collections.emptySet();
        } catch (IOException e) {
            if (ExceptionUtils.findThrowable(e, FileNotFoundException.class).isPresent()) {
                return Collections.emptySet();
            }
            throw e;
        }
    }

    private void waitUntilJobFinished(JobGraph jobGraph) throws Exception {
        JobResult jobResult = (JobResult) this.cluster.getMiniCluster().requestJobResult(((JobSubmissionResult) this.cluster.getMiniCluster().submitJob(jobGraph).get()).getJobID()).get();
        if (jobResult.getSerializedThrowable().isPresent()) {
            throw ((SerializedThrowable) jobResult.getSerializedThrowable().get());
        }
        Assert.assertSame(ApplicationStatus.SUCCEEDED, jobResult.getApplicationStatus());
    }

    private Configuration configure() throws IOException {
        Configuration configuration = new Configuration();
        FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1L), 10);
        return configuration;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -565199254:
                if (implMethodName.equals("lambda$buildJobGraph$f78bf84b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
