/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class ChangelogRecoveryITCaseBase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 1;
    private static final int NUM_TASK_SLOTS = 4;
    protected static final int NUM_SLOTS = 4;
    protected static final int TOTAL_ELEMENTS = 10000;
    protected final AbstractStateBackend delegatedStateBackend;
    protected MiniClusterWithClientResource cluster;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Parameterized.Parameters(name="delegated state backend type = {0}")
    public static Collection<AbstractStateBackend> parameter() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend(true), new EmbeddedRocksDBStateBackend(false));
    }

    public ChangelogRecoveryITCaseBase(AbstractStateBackend delegatedStateBackend) {
        this.delegatedStateBackend = delegatedStateBackend;
    }

    @Before
    public void setup() throws Exception {
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configure()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @After
    public void tearDown() throws IOException {
        this.cluster.after();
        CollectionSink.clearExpectedResult();
    }

    protected StreamExecutionEnvironment getEnv(long checkpointInterval, int restartAttempts, long materializationInterval, int materializationMaxFailure) {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)false);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.enableCheckpointing(checkpointInterval).enableChangelogStateBackend(true);
        env.getCheckpointConfig().enableUnalignedCheckpoints(false);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)restartAttempts, (long)0L);
        if (materializationInterval >= 0L) {
            env.configure((ReadableConfig)new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, (Object)true).set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object)Duration.ofMillis(materializationInterval)).set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, (Object)materializationMaxFailure));
        } else {
            env.configure((ReadableConfig)new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, (Object)false));
        }
        return env;
    }

    protected StreamExecutionEnvironment getEnv(File checkpointFile, long checkpointInterval, int restartAttempts, long materializationInterval, int materializationMaxFailure) {
        StreamExecutionEnvironment env = this.getEnv(checkpointInterval, restartAttempts, materializationInterval, materializationMaxFailure);
        CheckpointStorageUtils.configureFileSystemCheckpointStorage((StreamExecutionEnvironment)env, (URI)checkpointFile.toURI());
        return env;
    }

    protected JobGraph buildJobGraph(StateBackend stateBackend, StreamExecutionEnvironment env, ControlledSource controlledSource, JobID jobId) {
        KeyedStream keyedStream = env.addSource((SourceFunction)controlledSource).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).keyBy((KeySelector & Serializable)element -> element);
        keyedStream.process((KeyedProcessFunction)new CountFunction()).addSink((SinkFunction)new CollectionSink()).setParallelism(1);
        keyedStream.window((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofMillis(10L))).process((ProcessWindowFunction)new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>(){

            public void process(Integer integer, ProcessWindowFunction.Context context, Iterable<Integer> elements, Collector<Integer> out) {
            }
        }).sinkTo((Sink)new DiscardingSink());
        return StateBackendUtils.configureStateBackendAndGetJobGraph((StreamExecutionEnvironment)env, (StateBackend)stateBackend, (ClassLoader)env.getClass().getClassLoader(), (JobID)jobId);
    }

    protected void waitAndAssert(JobGraph jobGraph) throws Exception {
        this.waitUntilJobFinished(jobGraph);
        Assert.assertEquals(CollectionSink.getActualResult(), ControlledSource.getExpectedResult());
    }

    protected JobID generateJobID() {
        byte[] randomBytes = new byte[16];
        ThreadLocalRandom.current().nextBytes(randomBytes);
        return JobID.fromByteArray((byte[])randomBytes);
    }

    public static Set<StateHandleID> getAllStateHandleId(JobID jobID, MiniCluster miniCluster) throws IOException, FlinkJobNotFoundException, ExecutionException, InterruptedException {
        CheckpointMetadata checkpointMetadata;
        Optional mostRecentCompletedCheckpointPath = CommonTestUtils.getLatestCompletedCheckpointPath((JobID)jobID, (MiniCluster)miniCluster);
        if (!mostRecentCompletedCheckpointPath.isPresent()) {
            return Collections.emptySet();
        }
        try {
            checkpointMetadata = TestUtils.loadCheckpointMetadata((String)((String)mostRecentCompletedCheckpointPath.get()));
        }
        catch (IOException ioException) {
            if (ExceptionUtils.findThrowable((Throwable)ioException, FileNotFoundException.class).isPresent()) {
                return Collections.emptySet();
            }
            throw ioException;
        }
        Set<StateHandleID> materializationIds = checkpointMetadata.getOperatorStates().stream().flatMap(operatorState -> operatorState.getStates().stream()).flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()).flatMap(keyedStateHandle -> keyedStateHandle instanceof ChangelogStateBackendHandle ? ((ChangelogStateBackendHandle)keyedStateHandle).getMaterializedStateHandles().stream() : Stream.of(keyedStateHandle)).map(KeyedStateHandle::getStateHandleId).collect(Collectors.toSet());
        if (!materializationIds.isEmpty()) {
            return materializationIds;
        }
        return Collections.emptySet();
    }

    private void waitUntilJobFinished(JobGraph jobGraph) throws Exception {
        JobSubmissionResult jobSubmissionResult = (JobSubmissionResult)this.cluster.getMiniCluster().submitJob(jobGraph).get();
        JobResult jobResult = (JobResult)this.cluster.getMiniCluster().requestJobResult(jobSubmissionResult.getJobID()).get();
        if (jobResult.getSerializedThrowable().isPresent()) {
            throw (SerializedThrowable)jobResult.getSerializedThrowable().get();
        }
        Assert.assertSame((Object)ApplicationStatus.SUCCEEDED, (Object)jobResult.getApplicationStatus());
    }

    private Configuration configure() throws IOException {
        Configuration configuration = new Configuration();
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)TEMPORARY_FOLDER.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        return configuration;
    }

    protected static class ArtificialFailure
    extends Exception {
        protected ArtificialFailure() {
        }
    }

    @FunctionalInterface
    protected static interface SerializableBooleanSupplierWithException
    extends Serializable {
        public boolean getAsBoolean() throws Exception;
    }

    protected static class CountFunction
    extends KeyedProcessFunction<Integer, Integer, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private ValueState<Integer> countState;

        protected CountFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.countState = this.getRuntimeContext().getState(new ValueStateDescriptor("countState", Integer.class));
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Tuple2<Integer, Integer>> out) throws Exception {
            Integer count = (Integer)this.countState.value();
            Integer currentCount = count == null ? 1 : count + 1;
            this.countState.update((Object)currentCount);
            out.collect((Object)Tuple2.of((Object)value, (Object)currentCount));
        }
    }

    protected static class CollectionSink
    implements SinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private static final Map<Integer, Integer> expectedResult = new HashMap<Integer, Integer>();

        protected CollectionSink() {
        }

        public void invoke(Tuple2<Integer, Integer> value, SinkFunction.Context context) throws Exception {
            expectedResult.merge((Integer)value.f0, (Integer)value.f1, Math::max);
        }

        public static void clearExpectedResult() {
            expectedResult.clear();
        }

        public static Map<Integer, Integer> getActualResult() {
            return expectedResult;
        }
    }

    protected static class ControlledSource
    extends RichSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private static final long serialVersionUID = 1L;
        protected volatile int currentIndex;
        protected final AtomicInteger completedCheckpointNum = new AtomicInteger();
        protected volatile boolean isCanceled;
        private static final List<Integer> sourceList = Collections.unmodifiableList(ControlledSource.initSourceData(10000));
        private transient ListState<Integer> currentIndexState;
        private transient ListState<Integer> completedCheckpointNumState;

        public static List<Integer> initSourceData(int totalNum) {
            ArrayList<Integer> sourceList = new ArrayList<Integer>(totalNum);
            for (int i = 0; i < totalNum; ++i) {
                sourceList.add(ThreadLocalRandom.current().nextInt(totalNum));
            }
            return sourceList;
        }

        public static Map<Integer, Integer> getExpectedResult() {
            return sourceList.stream().collect(Collectors.toConcurrentMap(element -> element % 100, element -> 1, Integer::sum));
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.currentIndexState.update(Collections.singletonList(this.currentIndex));
            this.completedCheckpointNumState.update(Collections.singletonList(this.completedCheckpointNum.get()));
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.currentIndexState = context.getOperatorStateStore().getListState(new ListStateDescriptor("currentIndexState", Integer.class));
            this.completedCheckpointNumState = context.getOperatorStateStore().getListState(new ListStateDescriptor("completedCheckpointNumState", Integer.class));
            if (context.isRestored()) {
                this.currentIndex = (Integer)Iterables.get((Iterable)((Iterable)this.currentIndexState.get()), (int)0);
                this.completedCheckpointNum.compareAndSet(0, (Integer)Iterables.get((Iterable)((Iterable)this.completedCheckpointNumState.get()), (int)0));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (!this.isCanceled && this.currentIndex < sourceList.size()) {
                this.beforeElement(ctx);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (!this.isCanceled && this.currentIndex < sourceList.size()) {
                        int currentElement = sourceList.get(this.currentIndex++);
                        ctx.collect((Object)(currentElement % 100));
                    }
                }
            }
        }

        protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
        }

        protected void waitWhile(SerializableBooleanSupplierWithException supplier) throws Exception {
            while (supplier.getAsBoolean()) {
                Thread.sleep(10L);
            }
        }

        protected void throwArtificialFailure() throws Exception {
            throw new ArtificialFailure();
        }

        public void cancel() {
            this.isCanceled = true;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.completedCheckpointNum.getAndIncrement();
        }
    }
}

