/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class IgnoreInFlightDataITCase
extends TestLogger {
    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(IgnoreInFlightDataITCase.getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private static final int PARALLELISM = 3;
    private SharedReference<OneShotLatch> checkpointReachSinkLatch;
    private SharedReference<AtomicLong> resultBeforeFail;
    private SharedReference<AtomicLong> result;
    private SharedReference<AtomicInteger> lastCheckpointValue;
    private int checkpointInterval = 5;

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"48m"));
        return config;
    }

    public void setupSharedObjects() {
        this.checkpointReachSinkLatch = this.sharedObjects.add((Object)new OneShotLatch());
        this.resultBeforeFail = this.sharedObjects.add((Object)new AtomicLong());
        this.result = this.sharedObjects.add((Object)new AtomicLong());
        this.lastCheckpointValue = this.sharedObjects.add((Object)new AtomicInteger());
    }

    @Test
    public void testIgnoreInFlightDataDuringRecovery() {
        while (!this.executeIgnoreInFlightDataDuringRecovery()) {
        }
    }

    private boolean executeIgnoreInFlightDataDuringRecovery() {
        this.setupSharedObjects();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing((long)(this.checkpointInterval *= 2));
        env.disableOperatorChaining();
        env.getCheckpointConfig().enableUnalignedCheckpoints();
        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
        env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1L);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)1, (long)0L);
        env.addSource((SourceFunction)new NumberSource(this.lastCheckpointValue)).map((MapFunction)new SlowMap(this.checkpointReachSinkLatch)).addSink((SinkFunction)new SumFailSink(this.checkpointReachSinkLatch, this.resultBeforeFail, this.result)).setParallelism(1);
        try {
            env.execute("Total sum");
        }
        catch (Exception ex) {
            this.log.error("Execution failed", (Throwable)ex);
            return false;
        }
        int sourceValueAfterRestore = ((AtomicInteger)this.lastCheckpointValue.get()).intValue() + 1;
        long resultWithoutIgnoringData = 0L;
        for (int i = 0; i <= sourceValueAfterRestore; ++i) {
            resultWithoutIgnoringData += (long)i;
        }
        MatcherAssert.assertThat((Object)((AtomicLong)this.result.get()).longValue(), (Matcher)Matchers.lessThan((Comparable)Long.valueOf(resultWithoutIgnoringData)));
        long expectedResult = ((AtomicLong)this.resultBeforeFail.get()).longValue() + (long)sourceValueAfterRestore;
        Assert.assertEquals((long)expectedResult, (long)((AtomicLong)this.result.get()).longValue());
        return true;
    }

    private static class SlowMap
    extends RichMapFunction<Integer, Integer> {
        private final SharedReference<OneShotLatch> checkpointReachSinkLatch;

        public SlowMap(SharedReference<OneShotLatch> checkpointReachSinkLatch) {
            this.checkpointReachSinkLatch = checkpointReachSinkLatch;
        }

        public Integer map(Integer value) throws Exception {
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() > 0) {
                ((OneShotLatch)this.checkpointReachSinkLatch.get()).await();
            }
            return value;
        }
    }

    private static class NumberSource
    implements SourceFunction<Integer>,
    CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private final SharedReference<AtomicInteger> lastCheckpointValue;
        private ListState<Integer> valueState;
        private volatile boolean isRunning = true;

        public NumberSource(SharedReference<AtomicInteger> lastCheckpointValue) {
            this.lastCheckpointValue = lastCheckpointValue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Iterator stateIt = ((Iterable)this.valueState.get()).iterator();
            boolean isRecovered = stateIt.hasNext();
            if (isRecovered) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    Integer lastValue = (Integer)stateIt.next();
                    Assert.assertEquals((long)((AtomicInteger)this.lastCheckpointValue.get()).intValue(), (long)lastValue.intValue());
                    ctx.collect((Object)(lastValue + 1));
                }
            }
            int next = 0;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                do {
                    this.valueState.update(Collections.singletonList(++next));
                    ctx.collect((Object)next);
                } while (next < 3);
            }
            while (this.isRunning) {
                LockSupport.parkNanos(100000L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            Iterator integerIterator = ((Iterable)this.valueState.get()).iterator();
            if (!integerIterator.hasNext() || (Integer)integerIterator.next() < 3 || context.getCheckpointId() > 1L && ((AtomicInteger)this.lastCheckpointValue.get()).get() < 3) {
                throw new RuntimeException("Not enough data to guarantee the in-flight data were generated before the first checkpoint");
            }
            if (context.getCheckpointId() > 2L) {
                return;
            }
            if (context.getCheckpointId() == 2L) {
                throw new ExpectedTestException("The planned fail on the second checkpoint");
            }
            ((AtomicInteger)this.lastCheckpointValue.get()).set((Integer)((Iterable)this.valueState.get()).iterator().next());
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.valueState = context.getOperatorStateStore().getListState(new ListStateDescriptor("state", Types.INT));
        }
    }

    private static class SumFailSink
    implements SinkFunction<Integer>,
    CheckpointedFunction {
        private final SharedReference<OneShotLatch> checkpointReachSinkLatch;
        private final SharedReference<AtomicLong> resultBeforeFail;
        private final SharedReference<AtomicLong> result;

        public SumFailSink(SharedReference<OneShotLatch> checkpointReachSinkLatch, SharedReference<AtomicLong> resultBeforeFail, SharedReference<AtomicLong> result) {
            this.checkpointReachSinkLatch = checkpointReachSinkLatch;
            this.resultBeforeFail = resultBeforeFail;
            this.result = result;
        }

        public void invoke(Integer value, SinkFunction.Context context) throws Exception {
            ((AtomicLong)this.result.get()).addAndGet(value.intValue());
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (context.getCheckpointId() == 1L) {
                ((AtomicLong)this.resultBeforeFail.get()).set(((AtomicLong)this.result.get()).longValue());
                this.sinkCheckpointStarted();
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            ((AtomicLong)this.result.get()).set(((AtomicLong)this.resultBeforeFail.get()).longValue());
        }

        public void sinkCheckpointStarted() {
            ((OneShotLatch)this.checkpointReachSinkLatch.get()).trigger();
        }
    }
}

