package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.NumberSequenceSourceWithWaitForCheckpoint;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.class */
public class CheckpointIntervalDuringBacklogITCase {
    private static final int NUM_SPLITS = 2;
    private static final int NUM_RECORDS = 40;
    private static final List<Long> EXPECTED_RESULT = (List) LongStream.rangeClosed(0, 39).boxed().collect(Collectors.toList());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase$CheckpointRecordingOperator.class */
    public static class CheckpointRecordingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final AtomicInteger numCheckpointsBeforeSwitchSource = new AtomicInteger(0);
        private static final AtomicInteger numCheckpointsAfterSwitchSource = new AtomicInteger(0);
        private int numRecords;

        private CheckpointRecordingOperator() {
            this.numRecords = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void reset() {
            numCheckpointsBeforeSwitchSource.set(0);
            numCheckpointsAfterSwitchSource.set(0);
        }

        public void processElement(StreamRecord<T> streamRecord) {
            this.numRecords++;
            this.output.collect(streamRecord);
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) {
            if (this.numRecords < 20) {
                numCheckpointsBeforeSwitchSource.incrementAndGet();
            } else {
                numCheckpointsAfterSwitchSource.incrementAndGet();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase$EnumeratorWithBacklogReport.class */
    private static class EnumeratorWithBacklogReport<SplitT extends SourceSplit, CheckpointT> implements SplitEnumerator<SplitT, CheckpointT> {
        private final SplitEnumerator<SplitT, CheckpointT> enumerator;
        private final SplitEnumeratorContext<SplitT> context;
        private final boolean isBacklog;

        private EnumeratorWithBacklogReport(SplitEnumerator<SplitT, CheckpointT> splitEnumerator, SplitEnumeratorContext<SplitT> splitEnumeratorContext, boolean z) {
            this.enumerator = splitEnumerator;
            this.context = splitEnumeratorContext;
            this.isBacklog = z;
        }

        public void start() {
            this.enumerator.start();
            this.context.setIsProcessingBacklog(this.isBacklog);
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            this.enumerator.handleSplitRequest(i, str);
        }

        public void addSplitsBack(List<SplitT> list, int i) {
            this.enumerator.addSplitsBack(list, i);
        }

        public void addReader(int i) {
            this.enumerator.addReader(i);
        }

        public CheckpointT snapshotState(long j) throws Exception {
            return (CheckpointT) this.enumerator.snapshotState(j);
        }

        public void close() throws IOException {
            this.enumerator.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase$SourceWithBacklogReport.class */
    private static class SourceWithBacklogReport<T, SplitT extends SourceSplit, EnumChkT> implements Source<T, SplitT, EnumChkT> {
        private final Source<T, SplitT, EnumChkT> source;
        private final boolean isBacklog;

        private SourceWithBacklogReport(Source<T, SplitT, EnumChkT> source, boolean z) {
            this.source = source;
            this.isBacklog = z;
        }

        public Boundedness getBoundedness() {
            return this.source.getBoundedness();
        }

        public SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext) throws Exception {
            return new EnumeratorWithBacklogReport(this.source.createEnumerator(splitEnumeratorContext), splitEnumeratorContext, this.isBacklog);
        }

        public SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, EnumChkT enumchkt) throws Exception {
            return new EnumeratorWithBacklogReport(this.source.restoreEnumerator(splitEnumeratorContext, enumchkt), splitEnumeratorContext, this.isBacklog);
        }

        public SimpleVersionedSerializer<SplitT> getSplitSerializer() {
            return this.source.getSplitSerializer();
        }

        public SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer() {
            return this.source.getEnumeratorCheckpointSerializer();
        }

        public SourceReader<T, SplitT> createReader(SourceReaderContext sourceReaderContext) throws Exception {
            return this.source.createReader(sourceReaderContext);
        }
    }

    @After
    public void tearDown() {
        CheckpointRecordingOperator.reset();
    }

    @Test
    public void testCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(200L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(1);
        runAndVerifyResult(executionEnvironment, HybridSource.builder(new NumberSequenceSourceWithWaitForCheckpoint(0L, 19L, NUM_SPLITS)).addSource(new NumberSequenceSourceWithWaitForCheckpoint(20L, 39L, NUM_SPLITS)).build());
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isGreaterThan(0);
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testDefaultCheckpointIntervalDuringBacklog() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(1);
        runAndVerifyResult(executionEnvironment, HybridSource.builder(new NumberSequenceSourceWithWaitForCheckpoint(0L, 19L, NUM_SPLITS)).addSource(new NumberSequenceSource(20L, 39L)).build());
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isGreaterThan(0);
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testNoCheckpointDuringBacklog() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(0L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(1);
        runAndVerifyResult(executionEnvironment, HybridSource.builder(new NumberSequenceSource(0L, 19L)).addSource(new NumberSequenceSource(20L, 39L)).build());
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsBeforeSwitchSource.get()).isEqualTo(0);
        Assertions.assertThat(CheckpointRecordingOperator.numCheckpointsAfterSwitchSource.get()).isGreaterThan(0);
    }

    @Test
    public void testExcludeFinishedOperatorBacklogStatus() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(0L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(1);
        SingleOutputStreamOperator transform = executionEnvironment.fromSource(new NumberSequenceSourceWithWaitForCheckpoint(2L, 39L, NUM_SPLITS), WatermarkStrategy.noWatermarks(), "non-backlog-source").union(new DataStream[]{executionEnvironment.fromSource(new SourceWithBacklogReport(new NumberSequenceSource(0L, 1L), true), WatermarkStrategy.noWatermarks(), "backlog-source").returns(Long.class)}).transform("CheckpointRecordingOperator", Types.LONG, new CheckpointRecordingOperator());
        ArrayList arrayList = new ArrayList();
        CloseableIterator executeAndCollect = transform.executeAndCollect();
        Throwable th = null;
        while (executeAndCollect.hasNext()) {
            try {
                try {
                    arrayList.add(executeAndCollect.next());
                } finally {
                }
            } catch (Throwable th2) {
                if (executeAndCollect != null) {
                    if (th != null) {
                        try {
                            executeAndCollect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        executeAndCollect.close();
                    }
                }
                throw th2;
            }
        }
        if (executeAndCollect != null) {
            if (0 != 0) {
                try {
                    executeAndCollect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                executeAndCollect.close();
            }
        }
        Collections.sort(arrayList);
        Assertions.assertThat(arrayList).containsExactly(EXPECTED_RESULT.toArray(new Long[0]));
    }

    private void runAndVerifyResult(StreamExecutionEnvironment streamExecutionEnvironment, Source<Long, ?, ?> source) throws Exception {
        SingleOutputStreamOperator transform = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "hybrid-source").returns(Long.class).transform("CheckpointRecordingOperator", Types.LONG, new CheckpointRecordingOperator());
        ArrayList arrayList = new ArrayList();
        CloseableIterator executeAndCollect = transform.executeAndCollect();
        Throwable th = null;
        while (executeAndCollect.hasNext()) {
            try {
                try {
                    arrayList.add(executeAndCollect.next());
                } catch (Throwable th2) {
                    if (executeAndCollect != null) {
                        if (th != null) {
                            try {
                                executeAndCollect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            executeAndCollect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (executeAndCollect != null) {
            if (0 != 0) {
                try {
                    executeAndCollect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                executeAndCollect.close();
            }
        }
        Collections.sort(arrayList);
        Assertions.assertThat(arrayList).containsExactly(EXPECTED_RESULT.toArray(new Long[0]));
    }
}
