package org.apache.flink.test.checkpointing;

import java.util.Queue;
import java.util.Random;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.collect.EvictingQueue;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.class */
public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
    private static final long NUM_INPUT = 2500000;
    private static final int NUM_OUTPUT = 1000;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase$FoldEvictingQueueSink.class */
    private static class FoldEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[6];

        private FoldEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> tuple2) throws Exception {
            if (queues[((Integer) tuple2.f0).intValue()] == null) {
                queues[((Integer) tuple2.f0).intValue()] = EvictingQueue.create(UdfStreamOperatorCheckpointingITCase.NUM_OUTPUT);
            }
            queues[((Integer) tuple2.f0).intValue()].add(tuple2.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase$MinEvictingQueueSink.class */
    private static class MinEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[6];

        private MinEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> tuple2) throws Exception {
            if (queues[((Integer) tuple2.f0).intValue()] == null) {
                queues[((Integer) tuple2.f0).intValue()] = EvictingQueue.create(UdfStreamOperatorCheckpointingITCase.NUM_OUTPUT);
            }
            queues[((Integer) tuple2.f0).intValue()].add(tuple2.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase$OnceFailingIdentityMapFunction.class */
    private static class OnceFailingIdentityMapFunction extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> implements Checkpointed<Long> {
        private static volatile boolean hasFailed = false;
        private final long numElements;
        private long failurePos;
        private long count;

        public OnceFailingIdentityMapFunction(long j) {
            this.numElements = j;
        }

        public void open(Configuration configuration) throws Exception {
            long numberOfParallelSubtasks = (long) ((0.4d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = (new Random().nextLong() % (((long) ((0.7d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks())) - numberOfParallelSubtasks)) + numberOfParallelSubtasks;
        }

        public Tuple2<Integer, Long> map(Tuple2<Integer, Long> tuple2) throws Exception {
            if (hasFailed || this.count < this.failurePos) {
                this.count++;
                return tuple2;
            }
            hasFailed = true;
            throw new Exception("Test Failure");
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m577snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase$StatefulMultipleSequence.class */
    private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>> implements Checkpointed<Long> {
        private long count;

        private StatefulMultipleSequence() {
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            while (this.count < UdfStreamOperatorCheckpointingITCase.NUM_INPUT) {
                synchronized (checkpointLock) {
                    for (int i = 0; i < 6; i++) {
                        sourceContext.collect(Tuple2.of(Integer.valueOf(i), Long.valueOf(this.count + 1)));
                    }
                    this.count++;
                }
            }
        }

        public void cancel() {
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m578snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase$SumEvictingQueueSink.class */
    private static class SumEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[6];

        private SumEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> tuple2) throws Exception {
            if (queues[((Integer) tuple2.f0).intValue()] == null) {
                queues[((Integer) tuple2.f0).intValue()] = EvictingQueue.create(UdfStreamOperatorCheckpointingITCase.NUM_OUTPUT);
            }
            queues[((Integer) tuple2.f0).intValue()].add(tuple2.f1);
        }
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void testProgram(StreamExecutionEnvironment streamExecutionEnvironment) {
        KeyedStream keyBy = streamExecutionEnvironment.addSource(new StatefulMultipleSequence()).keyBy(new int[]{0});
        keyBy.min(1).map(new OnceFailingIdentityMapFunction(NUM_INPUT)).keyBy(new int[]{0}).addSink(new MinEvictingQueueSink());
        keyBy.reduce(new ReduceFunction<Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.checkpointing.UdfStreamOperatorCheckpointingITCase.1
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tuple2, Tuple2<Integer, Long> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Long.valueOf(((Long) tuple2.f1).longValue() + ((Long) tuple22.f1).longValue()));
            }
        }).keyBy(new int[]{0}).addSink(new SumEvictingQueueSink());
        keyBy.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.checkpointing.UdfStreamOperatorCheckpointingITCase.2
            public Tuple2<Integer, Long> fold(Tuple2<Integer, Long> tuple2, Tuple2<Integer, Long> tuple22) throws Exception {
                return Tuple2.of(tuple22.f0, Long.valueOf(((Long) tuple2.f1).longValue() + ((Long) tuple22.f1).longValue()));
            }
        }).keyBy(new int[]{0}).addSink(new FoldEvictingQueueSink());
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void postSubmit() {
        for (int i = 0; i < 6; i++) {
            for (Long l : MinEvictingQueueSink.queues[i]) {
                Assert.assertTrue("Value different from 1 found, was " + l + ".", l.longValue() == 1);
            }
        }
        for (int i2 = 0; i2 < 6; i2++) {
            long j = 2499000;
            long j2 = (2499000 * (2499000 + 1)) / 2;
            while (!SumEvictingQueueSink.queues[i2].isEmpty()) {
                long j3 = j2;
                long j4 = j + 1;
                j = j3;
                j2 = j3 + j4;
                Long remove = SumEvictingQueueSink.queues[i2].remove();
                Assert.assertTrue("Unexpected reduce value " + remove + " instead of " + j2 + ".", remove.longValue() == j2);
            }
        }
        for (int i3 = 0; i3 < 6; i3++) {
            long j5 = 2499000;
            long j6 = (2499000 * (2499000 + 1)) / 2;
            while (!FoldEvictingQueueSink.queues[i3].isEmpty()) {
                long j7 = j6;
                long j8 = j5 + 1;
                j5 = j7;
                j6 = j7 + j8;
                Long remove2 = FoldEvictingQueueSink.queues[i3].remove();
                Assert.assertTrue("Unexpected fold value " + remove2 + " instead of " + j6 + ".", remove2.longValue() == j6);
            }
        }
    }
}
