/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.checkpointing.SharedPoolNettyShuffleServiceFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.ErrorCollector;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={FailsWithAdaptiveScheduler.class})
public abstract class UnalignedCheckpointTestBase
extends TestLogger {
    protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
    protected static final String NUM_INPUTS = "inputs_";
    protected static final String NUM_OUTPUTS = "outputs";
    protected static final String NUM_OUT_OF_ORDER = "outOfOrder";
    protected static final String NUM_FAILURES = "failures";
    protected static final String NUM_DUPLICATES = "duplicates";
    protected static final String NUM_LOST = "lost";
    protected static final int BUFFER_PER_CHANNEL = 1;
    protected static final int NUM_SOURCES = 3;
    private static final long HEADER = -6066934754945531904L;
    private static final long HEADER_MASK = -4294967296L;
    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();
    @Rule
    public ErrorCollector collector = new ErrorCollector();
    @Rule
    public TestName name = new TestName();

    @BeforeClass
    public static void beforeAll() {
        SharedPoolNettyShuffleServiceFactory.resetBufferPool(60);
    }

    @AfterClass
    public static void afterAll() {
        SharedPoolNettyShuffleServiceFactory.clearBufferPool();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    protected File execute(UnalignedSettings settings) throws Exception {
        File checkpointDir = this.temp.newFolder();
        Configuration conf = settings.getConfiguration(checkpointDir);
        FsStateChangelogStorageFactory.configure((Configuration)conf, (File)this.temp.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        StreamGraph streamGraph = this.getStreamGraph(settings, conf);
        int requiredSlots = streamGraph.getStreamNodes().stream().mapToInt(node -> node.getParallelism()).reduce(0, ((UnalignedSettings)settings).channelType.slotSharing ? Integer::max : Integer::sum);
        int numberTaskmanagers = ((UnalignedSettings)settings).channelType.slotsToTaskManagers.apply(requiredSlots);
        int slotsPerTM = (requiredSlots + numberTaskmanagers - 1) / numberTaskmanagers;
        MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(conf).setNumberTaskManagers(numberTaskmanagers).setNumberSlotsPerTaskManager(slotsPerTM).build());
        miniCluster.before();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        settings.configure(env);
        try {
            System.out.println("Starting " + ((Object)((Object)this)).getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
            CompletableFuture result = miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
            this.checkCounters(((JobResult)miniCluster.getMiniCluster().requestJobResult(((JobSubmissionResult)result.get()).getJobID()).get()).toJobExecutionResult(((Object)((Object)this)).getClass().getClassLoader()));
            System.out.println("Finished " + ((Object)((Object)this)).getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
        }
        catch (Exception e) {
            if (!ExceptionUtils.findThrowable((Throwable)e, TestException.class).isPresent()) {
                throw e;
            }
        }
        finally {
            miniCluster.after();
        }
        if (settings.generateCheckpoint) {
            return TestUtils.getMostRecentCompletedCheckpoint((File)checkpointDir);
        }
        return null;
    }

    private StreamGraph getStreamGraph(UnalignedSettings settings, Configuration conf) {
        LocalStreamEnvironment setupEnv = StreamExecutionEnvironment.createLocalEnvironment((Configuration)conf);
        settings.configure((StreamExecutionEnvironment)setupEnv);
        settings.dagCreator.create((StreamExecutionEnvironment)setupEnv, 10, ((UnalignedSettings)settings).channelType.slotSharing, settings.expectedFailures - settings.failuresAfterSourceFinishes, settings.sourceSleepMs);
        return setupEnv.getStreamGraph();
    }

    protected abstract void checkCounters(JobExecutionResult var1);

    protected static long withHeader(long value) {
        Preconditions.checkState((value <= Integer.MAX_VALUE ? 1 : 0) != 0, (Object)"Value too large for header, this indicates that the test is running too long.");
        return value ^ 0xABCDEAFC00000000L;
    }

    protected static long withoutHeader(long value) {
        UnalignedCheckpointTestBase.checkHeader(value);
        return value ^ 0xABCDEAFC00000000L;
    }

    protected static long checkHeader(long value) {
        if ((value & 0xFFFFFFFF00000000L) != -6066934754945531904L) {
            throw new IllegalArgumentException("Stream corrupted. Cannot find the header " + Long.toHexString(-6066934754945531904L) + " in the value " + Long.toHexString(value));
        }
        return value;
    }

    static class TestException
    extends Exception {
        public TestException(String s) {
            super(s);
        }
    }

    static class MinEmittingFunction
    extends RichCoFlatMapFunction<Long, Long, Long>
    implements CheckpointedFunction {
        private ListState<State> stateList;
        private State state;

        MinEmittingFunction() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.stateList.update(Collections.singletonList(this.state));
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.stateList = context.getOperatorStateStore().getListState(new ListStateDescriptor("state", State.class));
            this.state = (State)Iterables.getOnlyElement((Iterable)((Iterable)this.stateList.get()), (Object)new State());
        }

        public void flatMap1(Long value, Collector<Long> out) {
            long baseValue = UnalignedCheckpointTestBase.withoutHeader(value);
            this.state.lastLeft = baseValue;
            if (this.state.lastRight >= baseValue) {
                out.collect((Object)value);
            }
        }

        public void flatMap2(Long value, Collector<Long> out) {
            long baseValue = UnalignedCheckpointTestBase.withoutHeader(value);
            this.state.lastRight = baseValue;
            if (this.state.lastLeft >= baseValue) {
                out.collect((Object)value);
            }
        }

        private static class State {
            private long lastLeft = Long.MIN_VALUE;
            private long lastRight = Long.MIN_VALUE;

            private State() {
            }
        }
    }

    protected static abstract class VerifyingSinkBase<State extends VerifyingSinkStateBase>
    extends RichSinkFunction<Long>
    implements CheckpointedFunction,
    CheckpointListener {
        private final LongCounter numOutputCounter = new LongCounter();
        private final LongCounter outOfOrderCounter = new LongCounter();
        private final LongCounter lostCounter = new LongCounter();
        private final LongCounter duplicatesCounter = new LongCounter();
        private final IntCounter numFailures = new IntCounter();
        private final Duration backpressureInterval;
        private ListState<State> stateList;
        protected transient State state;
        protected final long minCheckpoints;
        private boolean recovered;
        @Nullable
        private Deadline backpressureUntil;

        protected VerifyingSinkBase(long minCheckpoints, long checkpointingInterval) {
            this.minCheckpoints = minCheckpoints;
            this.backpressureInterval = Duration.ofMillis(checkpointingInterval);
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.getRuntimeContext().addAccumulator(UnalignedCheckpointTestBase.NUM_OUTPUTS, (Accumulator)this.numOutputCounter);
            this.getRuntimeContext().addAccumulator(UnalignedCheckpointTestBase.NUM_OUT_OF_ORDER, (Accumulator)this.outOfOrderCounter);
            this.getRuntimeContext().addAccumulator(UnalignedCheckpointTestBase.NUM_DUPLICATES, (Accumulator)this.duplicatesCounter);
            this.getRuntimeContext().addAccumulator(UnalignedCheckpointTestBase.NUM_LOST, (Accumulator)this.lostCounter);
            this.getRuntimeContext().addAccumulator(UnalignedCheckpointTestBase.NUM_FAILURES, (Accumulator)this.numFailures);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            State state = this.createState();
            this.stateList = context.getOperatorStateStore().getListState(new ListStateDescriptor("state", state.getClass()));
            this.state = (VerifyingSinkStateBase)Iterables.getOnlyElement((Iterable)((Iterable)this.stateList.get()), state);
            LOG.info("Inducing no backpressure @ {} subtask ({} attempt)", (Object)this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), (Object)this.getRuntimeContext().getTaskInfo().getAttemptNumber());
        }

        protected abstract State createState();

        protected void induceBackpressure() throws InterruptedException {
            if (this.backpressureUntil != null) {
                Thread.sleep(1L);
                if (this.backpressureUntil.isOverdue()) {
                    this.backpressureUntil = null;
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.stateList.update(Collections.singletonList(this.state));
            if (this.recovered) {
                this.backpressureUntil = Deadline.fromNow((Duration)this.backpressureInterval);
            }
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.recovered = true;
            ++((VerifyingSinkStateBase)this.state).completedCheckpoints;
            if (((VerifyingSinkStateBase)this.state).completedCheckpoints < this.minCheckpoints) {
                this.backpressureUntil = Deadline.fromNow((Duration)this.backpressureInterval);
                LOG.info("Inducing backpressure until {} @ {} subtask ({} attempt)", new Object[]{this.backpressureUntil, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber()});
            } else {
                this.backpressureUntil = null;
                LOG.info("Inducing no backpressure @ {} subtask ({} attempt)", (Object)this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), (Object)this.getRuntimeContext().getTaskInfo().getAttemptNumber());
            }
        }

        public void close() throws Exception {
            this.numOutputCounter.add(((VerifyingSinkStateBase)this.state).numOutput);
            this.outOfOrderCounter.add(((VerifyingSinkStateBase)this.state).numOutOfOrderness);
            this.duplicatesCounter.add(((VerifyingSinkStateBase)this.state).numDuplicates);
            this.lostCounter.add(((VerifyingSinkStateBase)this.state).numLostValues);
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                this.numFailures.add(this.getRuntimeContext().getTaskInfo().getAttemptNumber());
            }
            LOG.info("Last state {} @ {} subtask ({} attempt)", new Object[]{this.state, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber()});
            super.close();
        }
    }

    public static class VerifyingSinkStateBase {
        protected long numOutOfOrderness;
        protected long numLostValues;
        protected long numDuplicates;
        protected long numOutput = 0L;
        protected long completedCheckpoints;

        public String toString() {
            return "StateBase{numOutOfOrderness=" + this.numOutOfOrderness + ", numLostValues=" + this.numLostValues + ", numDuplicates=" + this.numDuplicates + ", numOutput=" + this.numOutput + ", completedCheckpoints=" + this.completedCheckpoints + '}';
        }
    }

    protected static class FailingMapper<T>
    extends RichMapFunction<T, T>
    implements CheckpointedFunction,
    CheckpointListener {
        private static final ListStateDescriptor<FailingMapperState> FAILING_MAPPER_STATE_DESCRIPTOR = new ListStateDescriptor("state", FailingMapperState.class);
        private ListState<FailingMapperState> listState;
        @Nullable
        private transient FailingMapperState state;
        private final FilterFunction<FailingMapperState> failDuringMap;
        private final FilterFunction<FailingMapperState> failDuringSnapshot;
        private final FilterFunction<FailingMapperState> failDuringRecovery;
        private final FilterFunction<FailingMapperState> failDuringClose;
        private transient Object lastValue;

        protected FailingMapper(FilterFunction<FailingMapperState> failDuringMap, FilterFunction<FailingMapperState> failDuringSnapshot, FilterFunction<FailingMapperState> failDuringRecovery, FilterFunction<FailingMapperState> failDuringClose) {
            this.failDuringMap = failDuringMap;
            this.failDuringSnapshot = failDuringSnapshot;
            this.failDuringRecovery = failDuringRecovery;
            this.failDuringClose = failDuringClose;
        }

        public T map(T value) throws Exception {
            this.lastValue = value instanceof Long ? Long.valueOf(UnalignedCheckpointTestBase.withoutHeader((Long)value)) : value;
            this.checkFail(this.failDuringMap, "map");
            return value;
        }

        public void checkFail(FilterFunction<FailingMapperState> failFunction, String description) throws Exception {
            if (this.state != null && failFunction.filter((Object)this.state)) {
                this.failMapper(description);
            }
        }

        private void failMapper(String description) throws Exception {
            throw new TestException("Failing " + description + " @ " + this.state.completedCheckpoints + " (" + this.state.runNumber + " attempt); last value " + this.lastValue);
        }

        public void notifyCheckpointComplete(long checkpointId) {
            if (this.state != null) {
                ++this.state.completedCheckpoints;
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.checkFail(this.failDuringSnapshot, "snapshotState");
            this.listState.clear();
            if (this.state != null) {
                this.listState.add((Object)this.state);
            }
        }

        public void close() throws Exception {
            this.checkFail(this.failDuringClose, "close");
            super.close();
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.listState = context.getOperatorStateStore().getListState(FAILING_MAPPER_STATE_DESCRIPTOR);
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                this.state = (FailingMapperState)Iterables.get((Iterable)((Iterable)this.listState.get()), (int)0, (Object)new FailingMapperState(0L, 0L));
                this.state.runNumber = this.getRuntimeContext().getTaskInfo().getAttemptNumber();
            }
            this.checkFail(this.failDuringRecovery, "initializeState");
        }

        protected static class FailingMapperState {
            protected long completedCheckpoints;
            protected long runNumber;

            protected FailingMapperState(long completedCheckpoints, long runNumber) {
                this.completedCheckpoints = completedCheckpoints;
                this.runNumber = runNumber;
            }
        }
    }

    protected static class ChunkDistributingPartitioner
    implements Partitioner<Long> {
        protected ChunkDistributingPartitioner() {
        }

        public int partition(Long key, int numPartitions) {
            return (int)(UnalignedCheckpointTestBase.withoutHeader(key) / (long)numPartitions % (long)numPartitions);
        }
    }

    protected static class ShiftingPartitioner
    implements Partitioner<Long> {
        protected ShiftingPartitioner() {
        }

        public int partition(Long key, int numPartitions) {
            return (int)((UnalignedCheckpointTestBase.withoutHeader(key) + 1L) % (long)numPartitions);
        }
    }

    protected static class UnalignedSettings {
        private int parallelism;
        private final int minCheckpoints = 10;
        @Nullable
        private File restoreCheckpoint;
        private boolean generateCheckpoint = false;
        int expectedFailures = 0;
        int tolerableCheckpointFailures = 0;
        private final DagCreator dagCreator;
        private int alignmentTimeout = 0;
        private Duration checkpointTimeout = (Duration)CheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue();
        private int failuresAfterSourceFinishes = 0;
        private ChannelType channelType = ChannelType.MIXED;
        private int buffersPerChannel = 1;
        private long sourceSleepMs = 0L;

        public UnalignedSettings(DagCreator dagCreator) {
            this.dagCreator = dagCreator;
        }

        public UnalignedSettings setParallelism(int parallelism) {
            this.parallelism = parallelism;
            return this;
        }

        public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
            this.restoreCheckpoint = restoreCheckpoint;
            return this;
        }

        public UnalignedSettings setGenerateCheckpoint(boolean generateCheckpoint) {
            this.generateCheckpoint = generateCheckpoint;
            return this;
        }

        public UnalignedSettings setExpectedFailures(int expectedFailures) {
            this.expectedFailures = expectedFailures;
            return this;
        }

        public UnalignedSettings setCheckpointTimeout(Duration checkpointTimeout) {
            this.checkpointTimeout = checkpointTimeout;
            return this;
        }

        public UnalignedSettings setAlignmentTimeout(int alignmentTimeout) {
            this.alignmentTimeout = alignmentTimeout;
            return this;
        }

        public UnalignedSettings setFailuresAfterSourceFinishes(int failuresAfterSourceFinishes) {
            this.failuresAfterSourceFinishes = failuresAfterSourceFinishes;
            return this;
        }

        public UnalignedSettings setChannelTypes(ChannelType channelType) {
            this.channelType = channelType;
            return this;
        }

        public UnalignedSettings setTolerableCheckpointFailures(int tolerableCheckpointFailures) {
            this.tolerableCheckpointFailures = tolerableCheckpointFailures;
            return this;
        }

        public UnalignedSettings setBuffersPerChannel(int buffersPerChannel) {
            this.buffersPerChannel = buffersPerChannel;
            return this;
        }

        public UnalignedSettings setSourceSleepMs(long sourceSleepMs) {
            this.sourceSleepMs = sourceSleepMs;
            return this;
        }

        public void configure(StreamExecutionEnvironment env) {
            env.enableCheckpointing(Math.max(100L, (long)this.parallelism * 50L));
            env.getCheckpointConfig().setAlignmentTimeout(Duration.ofMillis(this.alignmentTimeout));
            env.getCheckpointConfig().setCheckpointTimeout(this.checkpointTimeout.toMillis());
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(this.tolerableCheckpointFailures);
            env.setParallelism(this.parallelism);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)(this.generateCheckpoint ? this.expectedFailures / 2 : this.expectedFailures), (Time)Time.milliseconds((long)100L)));
            env.getCheckpointConfig().enableUnalignedCheckpoints(true);
            env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
            if (this.generateCheckpoint) {
                env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
            }
        }

        public Configuration getConfiguration(File checkpointDir) {
            Configuration conf = new Configuration();
            conf.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, (Object)Float.valueOf(0.9f));
            conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4kb"));
            conf.set(StateBackendOptions.STATE_BACKEND, (Object)"filesystem");
            conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toURI().toString());
            if (this.restoreCheckpoint != null) {
                conf.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)this.restoreCheckpoint.toURI().toString());
            }
            conf.set(ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS, (Object)SharedPoolNettyShuffleServiceFactory.class.getName());
            conf.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, (Object)this.buffersPerChannel);
            conf.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, (Object)60000);
            conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.ofMebiBytes((long)32L));
            conf.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.ofMebiBytes((long)32L));
            conf.set(RpcOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofMinutes(1L));
            return conf;
        }

        public String toString() {
            return "UnalignedSettings{parallelism=" + this.parallelism + ", minCheckpoints=" + 10 + ", restoreCheckpoint=" + this.restoreCheckpoint + ", generateCheckpoint=" + this.generateCheckpoint + ", expectedFailures=" + this.expectedFailures + ", dagCreator=" + this.dagCreator + ", alignmentTimeout=" + this.alignmentTimeout + ", failuresAfterSourceFinishes=" + this.failuresAfterSourceFinishes + ", channelType=" + (Object)((Object)this.channelType) + ", sourceSleepMs=" + this.sourceSleepMs + '}';
        }
    }

    protected static enum ChannelType {
        LOCAL(true, n -> 1),
        REMOTE(false, n -> n),
        MIXED(true, n -> Math.min(n, 3));

        final boolean slotSharing;
        final Function<Integer, Integer> slotsToTaskManagers;

        private ChannelType(boolean slotSharing, Function<Integer, Integer> slotsToTaskManagers) {
            this.slotSharing = slotSharing;
            this.slotsToTaskManagers = slotsToTaskManagers;
        }

        public String toString() {
            return this.name().toLowerCase();
        }
    }

    static interface DagCreator {
        public void create(StreamExecutionEnvironment var1, int var2, boolean var3, int var4, long var5);
    }

    protected static class LongSource
    implements Source<Long, LongSplit, EnumeratorState> {
        private final int minCheckpoints;
        private final int numSplits;
        private final int expectedRestarts;
        private final long checkpointingInterval;
        private final long sourceSleepMs;

        protected LongSource(int minCheckpoints, int numSplits, int expectedRestarts, long checkpointingInterval, long sourceSleepMs) {
            this.minCheckpoints = minCheckpoints;
            this.numSplits = numSplits;
            this.expectedRestarts = expectedRestarts;
            this.checkpointingInterval = checkpointingInterval;
            this.sourceSleepMs = sourceSleepMs;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<Long, LongSplit> createReader(SourceReaderContext readerContext) {
            return new LongSourceReader(readerContext.getIndexOfSubtask(), this.minCheckpoints, this.expectedRestarts, this.checkpointingInterval, this.sourceSleepMs);
        }

        public SplitEnumerator<LongSplit, EnumeratorState> createEnumerator(SplitEnumeratorContext<LongSplit> enumContext) {
            List<LongSplit> splits = IntStream.range(0, this.numSplits).mapToObj(i -> new LongSplit(i, this.numSplits)).collect(Collectors.toList());
            return new LongSplitSplitEnumerator(enumContext, new EnumeratorState(splits, 0, 0));
        }

        public SplitEnumerator<LongSplit, EnumeratorState> restoreEnumerator(SplitEnumeratorContext<LongSplit> enumContext, EnumeratorState state) {
            return new LongSplitSplitEnumerator(enumContext, state);
        }

        public SimpleVersionedSerializer<LongSplit> getSplitSerializer() {
            return new SplitVersionedSerializer();
        }

        public SimpleVersionedSerializer<EnumeratorState> getEnumeratorCheckpointSerializer() {
            return new EnumeratorVersionedSerializer();
        }

        private static class SplitVersionedSerializer
        implements SimpleVersionedSerializer<LongSplit> {
            static final int LENGTH = 16;

            private SplitVersionedSerializer() {
            }

            public int getVersion() {
                return 0;
            }

            public byte[] serialize(LongSplit split) {
                byte[] bytes = new byte[16];
                ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment);
                return bytes;
            }

            public LongSplit deserialize(int version, byte[] serialized) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(serialized);
                return new LongSplit(byteBuffer.getLong(), byteBuffer.getInt());
            }
        }

        private static class EnumeratorVersionedSerializer
        implements SimpleVersionedSerializer<EnumeratorState> {
            private final SplitVersionedSerializer splitVersionedSerializer = new SplitVersionedSerializer();

            private EnumeratorVersionedSerializer() {
            }

            public int getVersion() {
                return 0;
            }

            public byte[] serialize(EnumeratorState state) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(state.unassignedSplits.size() * 16 + 8);
                byteBuffer.putInt(state.numRestarts);
                byteBuffer.putInt(state.numCompletedCheckpoints);
                for (LongSplit unassignedSplit : state.unassignedSplits) {
                    byteBuffer.put(this.splitVersionedSerializer.serialize(unassignedSplit));
                }
                return byteBuffer.array();
            }

            public EnumeratorState deserialize(int version, byte[] serialized) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(serialized);
                int numRestarts = byteBuffer.getInt();
                int numCompletedCheckpoints = byteBuffer.getInt();
                ArrayList<LongSplit> splits = new ArrayList<LongSplit>(serialized.length / 16);
                byte[] serializedSplit = new byte[16];
                while (byteBuffer.hasRemaining()) {
                    byteBuffer.get(serializedSplit);
                    splits.add(this.splitVersionedSerializer.deserialize(version, serializedSplit));
                }
                return new EnumeratorState(splits, numRestarts, numCompletedCheckpoints);
            }
        }

        private static class EnumeratorState {
            final List<LongSplit> unassignedSplits;
            int numRestarts;
            int numCompletedCheckpoints;

            public EnumeratorState(List<LongSplit> unassignedSplits, int numRestarts, int numCompletedCheckpoints) {
                this.unassignedSplits = unassignedSplits;
                this.numRestarts = numRestarts;
                this.numCompletedCheckpoints = numCompletedCheckpoints;
            }

            public String toString() {
                return "EnumeratorState{unassignedSplits=" + this.unassignedSplits + ", numRestarts=" + this.numRestarts + ", numCompletedCheckpoints=" + this.numCompletedCheckpoints + '}';
            }
        }

        private static class LongSplitSplitEnumerator
        implements SplitEnumerator<LongSplit, EnumeratorState> {
            private final SplitEnumeratorContext<LongSplit> context;
            private final EnumeratorState state;
            private final Map<Integer, Integer> subtaskRestarts = new HashMap<Integer, Integer>();

            private LongSplitSplitEnumerator(SplitEnumeratorContext<LongSplit> context, EnumeratorState state) {
                this.context = context;
                this.state = state;
            }

            public void start() {
            }

            public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            }

            public void addSplitsBack(List<LongSplit> splits, int subtaskId) {
                LOG.info("addSplitsBack {}", splits);
                this.subtaskRestarts.compute(subtaskId, (id, oldCount) -> oldCount == null ? this.state.numRestarts + 1 : oldCount + 1);
                this.state.unassignedSplits.addAll(splits);
            }

            public void addReader(int subtaskId) {
                if (this.context.registeredReaders().size() == this.context.currentParallelism()) {
                    if (!this.state.unassignedSplits.isEmpty()) {
                        Map<Integer, List<LongSplit>> assignment = this.state.unassignedSplits.stream().collect(Collectors.groupingBy(LongSplit::getBaseNumber));
                        LOG.info("Assigning splits {}", assignment);
                        this.context.assignSplits(new SplitsAssignment(assignment));
                        this.state.unassignedSplits.clear();
                    }
                    this.context.registeredReaders().keySet().forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
                    Optional<Integer> restarts = this.subtaskRestarts.values().stream().max(Comparator.naturalOrder());
                    if (restarts.isPresent() && restarts.get() > this.state.numRestarts) {
                        this.state.numRestarts = restarts.get();
                        this.subtaskRestarts.clear();
                        SyncEvent event = new SyncEvent(this.state.numRestarts, this.state.numCompletedCheckpoints);
                        this.context.registeredReaders().keySet().forEach(index -> this.context.sendEventToSourceReader(index.intValue(), (SourceEvent)event));
                    }
                }
            }

            public void notifyCheckpointComplete(long checkpointId) {
                ++this.state.numCompletedCheckpoints;
            }

            public EnumeratorState snapshotState(long checkpointId) throws Exception {
                LOG.info("snapshotState {}", (Object)this.state);
                return this.state;
            }

            public void close() throws IOException {
            }
        }

        private static class LongSplit
        implements SourceSplit {
            private final int increment;
            private long nextNumber;

            public LongSplit(long nextNumber, int increment) {
                this.nextNumber = nextNumber;
                this.increment = increment;
            }

            public int getBaseNumber() {
                return (int)(this.nextNumber % (long)this.increment);
            }

            public String splitId() {
                return String.valueOf(this.increment);
            }

            public String toString() {
                return "LongSplit{increment=" + this.increment + ", nextNumber=" + this.nextNumber + '}';
            }
        }

        private static class SyncEvent
        implements SourceEvent {
            final int numRestarts;
            final int numCheckpoints;

            SyncEvent(int numRestarts, int numCheckpoints) {
                this.numRestarts = numRestarts;
                this.numCheckpoints = numCheckpoints;
            }
        }

        private static class LongSourceReader
        implements SourceReader<Long, LongSplit> {
            private final int subtaskIndex;
            private final int minCheckpoints;
            private final int expectedRestarts;
            private final LongCounter numInputsCounter = new LongCounter();
            private final List<LongSplit> splits = new ArrayList<LongSplit>();
            private final Duration pumpInterval;
            private int numAbortedCheckpoints;
            private int numRestarts;
            private int numCompletedCheckpoints;
            private boolean finishing;
            private boolean recovered;
            private final long sourceSleepMs;
            @Nullable
            private Deadline pumpingUntil = null;

            public LongSourceReader(int subtaskIndex, int minCheckpoints, int expectedRestarts, long checkpointingInterval, long sourceSleepMs) {
                this.subtaskIndex = subtaskIndex;
                this.minCheckpoints = minCheckpoints;
                this.expectedRestarts = expectedRestarts;
                this.pumpInterval = Duration.ofMillis(checkpointingInterval);
                this.sourceSleepMs = sourceSleepMs;
            }

            public void start() {
            }

            public InputStatus pollNext(ReaderOutput<Long> output) throws InterruptedException {
                for (LongSplit split : this.splits) {
                    if (this.sourceSleepMs > 0L) {
                        Thread.sleep(this.sourceSleepMs);
                    }
                    output.collect((Object)UnalignedCheckpointTestBase.withHeader(split.nextNumber), split.nextNumber);
                    LongSplit longSplit = split;
                    longSplit.nextNumber = longSplit.nextNumber + (long)split.increment;
                }
                if (this.finishing) {
                    return InputStatus.END_OF_INPUT;
                }
                if (this.pumpingUntil != null && this.pumpingUntil.isOverdue()) {
                    this.pumpingUntil = null;
                }
                if (this.pumpingUntil == null) {
                    Thread.sleep(1L);
                }
                return InputStatus.MORE_AVAILABLE;
            }

            public List<LongSplit> snapshotState(long checkpointId) {
                LOG.info("Snapshotted {} @ {} subtask ({} attempt)", new Object[]{this.splits, this.subtaskIndex, this.numRestarts});
                this.pumpingUntil = null;
                return this.splits;
            }

            public void notifyCheckpointComplete(long checkpointId) {
                LOG.info("notifyCheckpointComplete {} @ {} subtask ({} attempt)", new Object[]{this.numCompletedCheckpoints, this.subtaskIndex, this.numRestarts});
                this.updatePollingState();
                ++this.numCompletedCheckpoints;
                this.recovered = true;
                this.numAbortedCheckpoints = 0;
            }

            public void notifyCheckpointAborted(long checkpointId) {
                if (this.numAbortedCheckpoints++ > 100) {
                    this.numCompletedCheckpoints = this.minCheckpoints + 1;
                }
                this.updatePollingState();
            }

            public CompletableFuture<Void> isAvailable() {
                return FutureUtils.completedVoidFuture();
            }

            public void addSplits(List<LongSplit> splits) {
                this.splits.addAll(splits);
                this.updatePollingState();
                LOG.info("Added splits {}, finishing={}, pumping until {} @ {} subtask ({} attempt)", new Object[]{splits, this.finishing, this.pumpingUntil, this.subtaskIndex, this.numRestarts});
            }

            public void notifyNoMoreSplits() {
                this.updatePollingState();
            }

            private void updatePollingState() {
                if (this.numCompletedCheckpoints >= this.minCheckpoints && this.numRestarts >= this.expectedRestarts) {
                    this.finishing = true;
                    LOG.info("Finishing @ {} subtask ({} attempt)", (Object)this.subtaskIndex, (Object)this.numRestarts);
                } else if (this.recovered) {
                    this.pumpingUntil = Deadline.fromNow((Duration)this.pumpInterval);
                    LOG.info("Pumping until {} @ {} subtask ({} attempt)", new Object[]{this.pumpingUntil, this.subtaskIndex, this.numRestarts});
                }
            }

            public void handleSourceEvents(SourceEvent sourceEvent) {
                if (sourceEvent instanceof SyncEvent) {
                    this.numRestarts = ((SyncEvent)sourceEvent).numRestarts;
                    this.numCompletedCheckpoints = ((SyncEvent)sourceEvent).numCheckpoints;
                    LOG.info("Set restarts={}, numCompletedCheckpoints={} @ {} subtask ({} attempt)", new Object[]{this.numRestarts, this.numCompletedCheckpoints, this.subtaskIndex, this.numRestarts});
                    this.updatePollingState();
                }
            }

            public void close() throws Exception {
                for (LongSplit split : this.splits) {
                    this.numInputsCounter.add(split.nextNumber / (long)split.increment);
                }
            }
        }
    }
}

