package org.apache.flink.api.connector.source.lib;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.class */
class NumberSequenceSourceTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest$DummyReaderContext.class */
    public static final class DummyReaderContext implements SourceReaderContext {
        private DummyReaderContext() {
        }

        public SourceReaderMetricGroup metricGroup() {
            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
        }

        public Configuration getConfiguration() {
            return new Configuration();
        }

        public String getLocalHostName() {
            return "localhost";
        }

        public int getIndexOfSubtask() {
            return 0;
        }

        public void sendSplitRequest() {
        }

        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
        }

        public int currentParallelism() {
            return 1;
        }
    }

    /* loaded from: input_file:org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest$TestingReaderOutput.class */
    private static final class TestingReaderOutput<E> implements ReaderOutput<E> {
        private final ArrayList<E> emittedRecords;

        private TestingReaderOutput() {
            this.emittedRecords = new ArrayList<>();
        }

        public void collect(E e) {
            this.emittedRecords.add(e);
        }

        public void collect(E e, long j) {
            collect(e);
        }

        public void emitWatermark(Watermark watermark) {
            throw new UnsupportedOperationException();
        }

        public void markIdle() {
            throw new UnsupportedOperationException();
        }

        public void markActive() {
            throw new UnsupportedOperationException();
        }

        public SourceOutput<E> createOutputForSplit(String str) {
            return this;
        }

        public void releaseOutputForSplit(String str) {
        }

        public ArrayList<E> getEmittedRecords() {
            return this.emittedRecords;
        }
    }

    NumberSequenceSourceTest() {
    }

    @Test
    void testReaderCheckpoints() throws Exception {
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader = createReader();
        createReader.addSplits(Arrays.asList(new NumberSequenceSource.NumberSequenceSplit("split-1", 177L, 333L), new NumberSequenceSource.NumberSequenceSplit("split-2", 334L, 563L)));
        long j = 128;
        while (createReader.pollNext(testingReaderOutput) != InputStatus.END_OF_INPUT) {
            long j2 = j - 1;
            j = j2;
            if (j2 <= 0) {
                j = 128;
                List snapshotState = createReader.snapshotState(1L);
                createReader = createReader();
                if (snapshotState.isEmpty()) {
                    createReader.notifyNoMoreSplits();
                } else {
                    createReader.addSplits(snapshotState);
                }
            }
        }
        validateSequence(testingReaderOutput.getEmittedRecords(), 177L, 563L);
    }

    private static void validateSequence(List<Long> list, long j, long j2) {
        if (list.size() != (j2 - j) + 1) {
            failSequence(list, j, j2);
        }
        long j3 = j;
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            it.next().longValue();
            long j4 = j3;
            j3 = j4 + 1;
            if (j4 != j4) {
                failSequence(list, j, j2);
            }
        }
    }

    private static void failSequence(List<Long> list, long j, long j2) {
        Assertions.fail(String.format("Expected: A sequence [%d, %d], but found: sequence (size %d) : %s", Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(list.size()), list));
    }

    private static SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader() {
        return new NumberSequenceSource(0L, 0L).createReader(new DummyReaderContext());
    }
}
