package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.class */
public class KeyedStateCheckpointingITCase extends TestLogger {
    protected static final int MAX_MEM_STATE_SIZE = 10485760;
    protected static final int NUM_STRINGS = 10000;
    protected static final int NUM_KEYS = 40;
    protected static final int NUM_TASK_MANAGERS = 2;
    protected static final int NUM_TASK_SLOTS = 2;
    protected static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(getConfiguration(), 2, 2));

    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase$CounterSink.class */
    public static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
        private static final Map<Integer, Long> ALL_COUNTS = new ConcurrentHashMap();
        private transient ValueState<NonSerializableLong> aCounts;
        private transient ValueState<Long> bCounts;

        private CounterSink() {
        }

        public void open(Configuration configuration) throws IOException {
            this.aCounts = getRuntimeContext().getState(new ValueStateDescriptor("a", NonSerializableLong.class));
            this.bCounts = getRuntimeContext().getState(new ValueStateDescriptor("b", Long.class));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void invoke(Tuple2<Integer, Long> tuple2) throws Exception {
            NonSerializableLong nonSerializableLong = (NonSerializableLong) this.aCounts.value();
            Long l = (Long) this.bCounts.value();
            long j = nonSerializableLong == null ? 0L : nonSerializableLong.value;
            Assert.assertEquals(j, l == null ? 0L : l.longValue());
            long j2 = j + 1;
            this.aCounts.update(NonSerializableLong.of(j2));
            this.bCounts.update(Long.valueOf(j2));
            ALL_COUNTS.put(tuple2.f0, Long.valueOf(j2));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase$IdentityKeySelector.class */
    public static class IdentityKeySelector<T> implements KeySelector<T, T> {
        private IdentityKeySelector() {
        }

        public T getKey(T t) throws Exception {
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase$IntGeneratingSourceFunction.class */
    public static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> implements ListCheckpointed<Integer>, CheckpointListener {
        private final int numElements;
        private final int checkpointLatestAt;
        private boolean checkpointHappened;
        private int lastEmitted = -1;
        private volatile boolean isRunning = true;

        IntGeneratingSourceFunction(int i, int i2) {
            this.numElements = i;
            this.checkpointLatestAt = i2;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            int indexOfThisSubtask = this.lastEmitted >= 0 ? this.lastEmitted + numberOfParallelSubtasks : getRuntimeContext().getIndexOfThisSubtask();
            while (true) {
                int i = indexOfThisSubtask;
                if (!this.isRunning || i >= this.numElements) {
                    return;
                }
                if (!this.checkpointHappened) {
                    if (i < this.checkpointLatestAt) {
                        Thread.sleep(1L);
                    } else {
                        synchronized (this) {
                            while (!this.checkpointHappened) {
                                wait();
                            }
                        }
                    }
                }
                synchronized (checkpointLock) {
                    sourceContext.collect(Integer.valueOf(i % KeyedStateCheckpointingITCase.NUM_KEYS));
                    this.lastEmitted = i;
                }
                indexOfThisSubtask = i + numberOfParallelSubtasks;
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(Integer.valueOf(this.lastEmitted));
        }

        public void restoreState(List<Integer> list) throws Exception {
            Assert.assertEquals("Test failed due to unexpected recovered state size", 1L, list.size());
            this.lastEmitted = list.get(0).intValue();
            this.checkpointHappened = true;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            synchronized (this) {
                this.checkpointHappened = true;
                notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase$NonSerializableLong.class */
    public static class NonSerializableLong {
        public long value;

        private NonSerializableLong(long j) {
            this.value = j;
        }

        public static NonSerializableLong of(long j) {
            return new NonSerializableLong(j);
        }

        public boolean equals(Object obj) {
            return this == obj || (obj != null && obj.getClass() == getClass() && ((NonSerializableLong) obj).value == this.value);
        }

        public int hashCode() {
            return (int) (this.value ^ (this.value >>> 32));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase$OnceFailingPartitionedSum.class */
    public static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> implements ListCheckpointed<Integer> {
        private static final Map<Integer, Long> ALL_SUMS = new ConcurrentHashMap();
        private final int failurePos;
        private int count;
        private boolean shouldFail = true;
        private transient ValueState<Long> sum;

        OnceFailingPartitionedSum(int i) {
            this.failurePos = i;
        }

        public void open(Configuration configuration) throws IOException {
            this.sum = getRuntimeContext().getState(new ValueStateDescriptor("my_state", Long.class));
        }

        public Tuple2<Integer, Long> map(Integer num) throws Exception {
            if (this.shouldFail) {
                int i = this.count;
                this.count = i + 1;
                if (i >= this.failurePos) {
                    this.shouldFail = false;
                    throw new Exception("Test Failure");
                }
            }
            Long l = (Long) this.sum.value();
            long longValue = (l == null ? 0L : l.longValue()) + num.intValue();
            this.sum.update(Long.valueOf(longValue));
            ALL_SUMS.put(num, Long.valueOf(longValue));
            return new Tuple2<>(num, Long.valueOf(longValue));
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(Integer.valueOf(this.count));
        }

        public void restoreState(List<Integer> list) throws Exception {
            Assert.assertEquals("Test failed due to unexpected recovered state size", 1L, list.size());
            this.count = list.get(0).intValue();
            this.shouldFail = false;
        }

        public void close() throws Exception {
            if (this.shouldFail) {
                Assert.fail("Test ineffective: Function cleanly finished without ever failing.");
            }
        }
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
        return configuration;
    }

    @Test
    public void testWithMemoryBackendSync() throws Exception {
        testProgramWithBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false));
    }

    @Test
    public void testWithMemoryBackendAsync() throws Exception {
        testProgramWithBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, true));
    }

    @Test
    public void testWithFsBackendSync() throws Exception {
        testProgramWithBackend(new FsStateBackend(this.tmpFolder.newFolder().toURI().toString(), false));
    }

    @Test
    public void testWithFsBackendAsync() throws Exception {
        testProgramWithBackend(new FsStateBackend(this.tmpFolder.newFolder().toURI().toString(), true));
    }

    @Test
    public void testWithRocksDbBackendFull() throws Exception {
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
        rocksDBStateBackend.setDbStoragePath(this.tmpFolder.newFolder().getAbsolutePath());
        testProgramWithBackend(rocksDBStateBackend);
    }

    @Test
    public void testWithRocksDbBackendIncremental() throws Exception {
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
        rocksDBStateBackend.setDbStoragePath(this.tmpFolder.newFolder().getAbsolutePath());
        testProgramWithBackend(rocksDBStateBackend);
    }

    protected void testProgramWithBackend(AbstractStateBackend abstractStateBackend) throws Exception {
        Assert.assertEquals("Broken test setup", 0L, 0L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        executionEnvironment.setStateBackend(abstractStateBackend);
        executionEnvironment.addSource(new IntGeneratingSourceFunction(5000, 2500)).union(new DataStream[]{executionEnvironment.addSource(new IntGeneratingSourceFunction(5000, 2500))}).keyBy(new IdentityKeySelector()).map(new OnceFailingPartitionedSum(new Random().nextInt(500) + 1500)).keyBy(new int[]{0}).addSink(new CounterSink());
        executionEnvironment.execute();
        Assert.assertEquals(40L, CounterSink.ALL_COUNTS.size());
        Assert.assertEquals(40L, OnceFailingPartitionedSum.ALL_SUMS.size());
        Iterator it = OnceFailingPartitionedSum.ALL_SUMS.entrySet().iterator();
        while (it.hasNext()) {
            Assert.assertEquals((((Integer) r0.getKey()).intValue() * 10000) / 40, ((Long) ((Map.Entry) it.next()).getValue()).longValue());
        }
        Iterator it2 = CounterSink.ALL_COUNTS.values().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(250L, ((Long) it2.next()).longValue());
        }
    }
}
