package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.class */
public class StreamCheckpointNotifierITCase {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 3;
    private static final int PARALLELISM = 6;
    private static ForkableFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase$GeneratingSourceFunction.class */
    private static class GeneratingSourceFunction extends RichSourceFunction<Long> implements ParallelSourceFunction<Long>, CheckpointListener, Checkpointed<Integer> {
        static final List<Long>[] completedCheckpoints = StreamCheckpointNotifierITCase.createCheckpointLists(StreamCheckpointNotifierITCase.PARALLELISM);
        static AtomicLong numPostFailureNotifications = new AtomicLong();
        private final long numElements;
        private final int notificationsToWaitFor;
        private int index;
        private int step;
        private volatile boolean notificationAlready;
        private volatile boolean isRunning = true;

        GeneratingSourceFunction(long j, int i) {
            this.numElements = j;
            this.notificationsToWaitFor = i;
        }

        public void open(Configuration configuration) throws IOException {
            this.step = getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = getRuntimeContext().getIndexOfThisSubtask();
            }
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            while (this.isRunning && this.index < this.numElements) {
                long j = this.index % 10;
                synchronized (checkpointLock) {
                    this.index += this.step;
                    sourceContext.collect(Long.valueOf(j));
                }
            }
            while (this.isRunning && numPostFailureNotifications.get() < this.notificationsToWaitFor) {
                Thread.sleep(50L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Integer m574snapshotState(long j, long j2) {
            return Integer.valueOf(this.index);
        }

        public void restoreState(Integer num) {
            this.index = num.intValue();
        }

        public void notifyCheckpointComplete(long j) {
            completedCheckpoints[getRuntimeContext().getIndexOfThisSubtask()].add(Long.valueOf(j));
            if (!OnceFailingReducer.hasFailed || this.notificationAlready) {
                return;
            }
            this.notificationAlready = true;
            numPostFailureNotifications.incrementAndGet();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase$IdentityMapFunction.class */
    private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>> implements CheckpointListener {
        static final List<Long>[] completedCheckpoints = StreamCheckpointNotifierITCase.createCheckpointLists(StreamCheckpointNotifierITCase.PARALLELISM);
        private volatile boolean notificationAlready;

        private IdentityMapFunction() {
        }

        public Tuple1<Long> map(Long l) throws Exception {
            return Tuple1.of(l);
        }

        public void notifyCheckpointComplete(long j) {
            completedCheckpoints[getRuntimeContext().getIndexOfThisSubtask()].add(Long.valueOf(j));
            if (!OnceFailingReducer.hasFailed || this.notificationAlready) {
                return;
            }
            this.notificationAlready = true;
            GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase$LeftIdentityCoRichFlatMapFunction.class */
    private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long> implements CheckpointListener {
        static final List<Long>[] completedCheckpoints = StreamCheckpointNotifierITCase.createCheckpointLists(StreamCheckpointNotifierITCase.PARALLELISM);
        private volatile boolean notificationAlready;

        private LeftIdentityCoRichFlatMapFunction() {
        }

        public void flatMap1(Long l, Collector<Long> collector) {
            collector.collect(l);
        }

        public void flatMap2(Long l, Collector<Long> collector) {
        }

        public void notifyCheckpointComplete(long j) {
            completedCheckpoints[getRuntimeContext().getIndexOfThisSubtask()].add(Long.valueOf(j));
            if (!OnceFailingReducer.hasFailed || this.notificationAlready) {
                return;
            }
            this.notificationAlready = true;
            GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
        }

        public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
            flatMap2((Long) obj, (Collector<Long>) collector);
        }

        public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
            flatMap1((Long) obj, (Collector<Long>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase$LongRichFilterFunction.class */
    private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointListener {
        static final List<Long>[] completedCheckpoints = StreamCheckpointNotifierITCase.createCheckpointLists(StreamCheckpointNotifierITCase.PARALLELISM);
        private volatile boolean notificationAlready;

        private LongRichFilterFunction() {
        }

        public boolean filter(Long l) {
            return l.longValue() < 100;
        }

        public void notifyCheckpointComplete(long j) {
            completedCheckpoints[getRuntimeContext().getIndexOfThisSubtask()].add(Long.valueOf(j));
            if (!OnceFailingReducer.hasFailed || this.notificationAlready) {
                return;
            }
            this.notificationAlready = true;
            GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase$OnceFailingReducer.class */
    private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> implements Checkpointed<Long>, CheckpointListener {
        static volatile long failureCheckpointID;
        private final long numElements;
        private long failurePos;
        private long count;
        private volatile boolean notificationAlready;
        static volatile boolean hasFailed = false;
        static final List<Long>[] completedCheckpoints = StreamCheckpointNotifierITCase.createCheckpointLists(StreamCheckpointNotifierITCase.PARALLELISM);

        OnceFailingReducer(long j) {
            this.numElements = j;
        }

        public void open(Configuration configuration) {
            long numberOfParallelSubtasks = (long) ((0.4d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = (new Random().nextLong() % (((long) ((0.7d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks())) - numberOfParallelSubtasks)) + numberOfParallelSubtasks;
        }

        public Tuple1<Long> reduce(Tuple1<Long> tuple1, Tuple1<Long> tuple12) {
            this.count++;
            tuple1.f0 = Long.valueOf(((Long) tuple1.f0).longValue() + ((Long) tuple12.f0).longValue());
            return tuple1;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m579snapshotState(long j, long j2) throws Exception {
            if (hasFailed || this.count < this.failurePos || getRuntimeContext().getIndexOfThisSubtask() != 0) {
                return Long.valueOf(this.count);
            }
            hasFailed = true;
            failureCheckpointID = j;
            throw new Exception("Test Failure");
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }

        public void notifyCheckpointComplete(long j) {
            completedCheckpoints[getRuntimeContext().getIndexOfThisSubtask()].add(Long.valueOf(j));
            if (!hasFailed || this.notificationAlready) {
                return;
            }
            this.notificationAlready = true;
            GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
        }
    }

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", NUM_TASK_MANAGERS);
            configuration.setInteger("taskmanager.numberOfTaskSlots", NUM_TASK_SLOTS);
            configuration.setString("restart-strategy.fixed-delay.delay", "0 ms");
            configuration.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(configuration, false);
            cluster.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to start test cluster: " + e.getMessage());
        }
    }

    @AfterClass
    public static void stopCluster() {
        try {
            cluster.stop();
            cluster = null;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to stop test cluster: " + e.getMessage());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testProgram() {
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.enableCheckpointing(500L);
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            DataStreamSource addSource = createRemoteEnvironment.addSource(new GeneratingSourceFunction(10000L, 30));
            addSource.filter(new LongRichFilterFunction()).connect(addSource).flatMap(new LeftIdentityCoRichFlatMapFunction()).map(new IdentityMapFunction()).startNewChain().keyBy(new int[]{0}).reduce(new OnceFailingReducer(10000L)).addSink(new DiscardingSink());
            createRemoteEnvironment.execute();
            long j = OnceFailingReducer.failureCheckpointID;
            Assert.assertNotEquals(0L, j);
            for (List[] listArr : Arrays.asList(GeneratingSourceFunction.completedCheckpoints, LongRichFilterFunction.completedCheckpoints, LeftIdentityCoRichFlatMapFunction.completedCheckpoints, IdentityMapFunction.completedCheckpoints, OnceFailingReducer.completedCheckpoints)) {
                for (List list : listArr) {
                    Assert.assertTrue("No checkpoint notification was received.", list.size() > 0);
                    Assert.assertFalse("Failure checkpoint was marked as completed.", list.contains(Long.valueOf(j)));
                    Assert.assertFalse("No checkpoint received after failure.", ((Long) list.get(list.size() - 1)).longValue() == j);
                    Assert.assertTrue("Checkpoint notification was received multiple times", list.size() == new HashSet(list).size());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    static List<Long>[] createCheckpointLists(int i) {
        List<Long>[] listArr = new List[i];
        for (int i2 = 0; i2 < i; i2++) {
            listArr[i2] = new ArrayList();
        }
        return listArr;
    }
}
