/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.NumberSequenceSourceWithWaitForCheckpoint;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;

public class CheckpointIntervalDuringBacklogITCase {
    private static final int NUM_SPLITS = 2;
    private static final int NUM_RECORDS = 40;
    private static final List<Long> EXPECTED_RESULT = LongStream.rangeClosed(0L, 39L).boxed().collect(Collectors.toList());

    @After
    public void tearDown() {
        CheckpointRecordingOperator.reset();
    }

    @Test
    public void testCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, (Object)Duration.ofMillis(200L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(1);
        HybridSource source = HybridSource.builder((Source)new NumberSequenceSourceWithWaitForCheckpoint(0L, 19L, 2)).addSource((Source)new NumberSequenceSourceWithWaitForCheckpoint(20L, 39L, 2)).build();
        this.runAndVerifyResult(env, (Source<Long, ?, ?>)source);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isGreaterThan(0);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testDefaultCheckpointIntervalDuringBacklog() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(100L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(1);
        HybridSource source = HybridSource.builder((Source)new NumberSequenceSourceWithWaitForCheckpoint(0L, 19L, 2)).addSource((Source)new NumberSequenceSource(20L, 39L)).build();
        this.runAndVerifyResult(env, (Source<Long, ?, ?>)source);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isGreaterThan(0);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testNoCheckpointDuringBacklog() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, (Object)Duration.ofMillis(0L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(1);
        HybridSource source = HybridSource.builder((Source)new NumberSequenceSource(0L, 19L)).addSource((Source)new NumberSequenceSource(20L, 39L)).build();
        this.runAndVerifyResult(env, (Source<Long, ?, ?>)source);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isEqualTo(0);
        Assertions.assertThat((int)CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testExcludeFinishedOperatorBacklogStatus() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, (Object)Duration.ofMillis(0L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(1);
        SingleOutputStreamOperator source2 = env.fromSource(new SourceWithBacklogReport((Source)new NumberSequenceSource(0L, 1L), true), WatermarkStrategy.noWatermarks(), "backlog-source").returns(Long.class);
        DataStreamSource source = env.fromSource((Source)new NumberSequenceSourceWithWaitForCheckpoint(2L, 39L, 2), WatermarkStrategy.noWatermarks(), "non-backlog-source");
        SingleOutputStreamOperator stream = source.union(new DataStream[]{source2}).transform("CheckpointRecordingOperator", Types.LONG, new CheckpointRecordingOperator());
        ArrayList<Object> result = new ArrayList<Object>();
        try (CloseableIterator iterator = stream.executeAndCollect();){
            while (iterator.hasNext()) {
                result.add(iterator.next());
            }
        }
        Collections.sort(result);
        Assertions.assertThat(result).containsExactly((Object[])EXPECTED_RESULT.toArray(new Long[0]));
    }

    private void runAndVerifyResult(StreamExecutionEnvironment env, Source<Long, ?, ?> source) throws Exception {
        SingleOutputStreamOperator stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "hybrid-source").returns(Long.class).transform("CheckpointRecordingOperator", Types.LONG, new CheckpointRecordingOperator());
        ArrayList<Object> result = new ArrayList<Object>();
        try (CloseableIterator iterator = stream.executeAndCollect();){
            while (iterator.hasNext()) {
                result.add(iterator.next());
            }
        }
        Collections.sort(result);
        Assertions.assertThat(result).containsExactly((Object[])EXPECTED_RESULT.toArray(new Long[0]));
    }

    private static class CheckpointRecordingOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {
        private static final AtomicInteger numCheckpointsBeforeSwitchSource = new AtomicInteger(0);
        private static final AtomicInteger numCheckpointsAfterSwitchSource = new AtomicInteger(0);
        private int numRecords = 0;

        private CheckpointRecordingOperator() {
        }

        private static void reset() {
            numCheckpointsBeforeSwitchSource.set(0);
            numCheckpointsAfterSwitchSource.set(0);
        }

        public void processElement(StreamRecord<T> element) {
            ++this.numRecords;
            this.output.collect(element);
        }

        public void snapshotState(StateSnapshotContext context) {
            if (this.numRecords < 20) {
                numCheckpointsBeforeSwitchSource.incrementAndGet();
            } else {
                numCheckpointsAfterSwitchSource.incrementAndGet();
            }
        }
    }

    private static class EnumeratorWithBacklogReport<SplitT extends SourceSplit, CheckpointT>
    implements SplitEnumerator<SplitT, CheckpointT> {
        private final SplitEnumerator<SplitT, CheckpointT> enumerator;
        private final SplitEnumeratorContext<SplitT> context;
        private final boolean isBacklog;

        private EnumeratorWithBacklogReport(SplitEnumerator<SplitT, CheckpointT> enumerator, SplitEnumeratorContext<SplitT> context, boolean isBacklog) {
            this.enumerator = enumerator;
            this.context = context;
            this.isBacklog = isBacklog;
        }

        public void start() {
            this.enumerator.start();
            this.context.setIsProcessingBacklog(this.isBacklog);
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            this.enumerator.handleSplitRequest(subtaskId, requesterHostname);
        }

        public void addSplitsBack(List<SplitT> splits, int subtaskId) {
            this.enumerator.addSplitsBack(splits, subtaskId);
        }

        public void addReader(int subtaskId) {
            this.enumerator.addReader(subtaskId);
        }

        public CheckpointT snapshotState(long checkpointId) throws Exception {
            return (CheckpointT)this.enumerator.snapshotState(checkpointId);
        }

        public void close() throws IOException {
            this.enumerator.close();
        }
    }

    private static class SourceWithBacklogReport<T, SplitT extends SourceSplit, EnumChkT>
    implements Source<T, SplitT, EnumChkT> {
        private final Source<T, SplitT, EnumChkT> source;
        private final boolean isBacklog;

        private SourceWithBacklogReport(Source<T, SplitT, EnumChkT> source, boolean isBacklog) {
            this.source = source;
            this.isBacklog = isBacklog;
        }

        public Boundedness getBoundedness() {
            return this.source.getBoundedness();
        }

        public SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) throws Exception {
            SplitEnumerator enumerator = this.source.createEnumerator(enumContext);
            return new EnumeratorWithBacklogReport(enumerator, enumContext, this.isBacklog);
        }

        public SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception {
            SplitEnumerator enumerator = this.source.restoreEnumerator(enumContext, checkpoint);
            return new EnumeratorWithBacklogReport(enumerator, enumContext, this.isBacklog);
        }

        public SimpleVersionedSerializer<SplitT> getSplitSerializer() {
            return this.source.getSplitSerializer();
        }

        public SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer() {
            return this.source.getEnumeratorCheckpointSerializer();
        }

        public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception {
            return this.source.createReader(readerContext);
        }
    }
}

