package org.apache.flink.api.connector.source.lib;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.class */
public class NumberSequenceSourceTest {
    @Test
    public void testReaderCheckpoints() throws Exception {
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader = createReader();
        createReader.addSplits(Arrays.asList(new NumberSequenceSource.NumberSequenceSplit("split-1", 177L, 333L), new NumberSequenceSource.NumberSequenceSplit("split-2", 334L, 563L)));
        long j = 128;
        while (createReader.pollNext(testingReaderOutput) != InputStatus.END_OF_INPUT) {
            long j2 = j - 1;
            j = j2;
            if (j2 <= 0) {
                j = 128;
                List snapshotState = createReader.snapshotState(1L);
                createReader = createReader();
                createReader.addSplits(snapshotState);
            }
        }
        validateSequence(testingReaderOutput.getEmittedRecords(), 177L, 563L);
    }

    private static void validateSequence(List<Long> list, long j, long j2) {
        if (list.size() != (j2 - j) + 1) {
            failSequence(list, j, j2);
        }
        long j3 = j;
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            it.next().longValue();
            long j4 = j3;
            j3 = j4 + 1;
            if (j4 != j4) {
                failSequence(list, j, j2);
            }
        }
    }

    private static void failSequence(List<Long> list, long j, long j2) {
        Assert.fail(String.format("Expected: A sequence [%d, %d], but found: sequence (size %d) : %s", Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(list.size()), list));
    }

    private static SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader() {
        return new NumberSequenceSource(0L, 0L).createReader(new TestingReaderContext());
    }
}
