package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.class */
public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends TestLogger {
    protected static final int NUM_SPLITS = 10;
    protected static final int NUM_RECORDS_PER_SPLIT = 10;
    protected static final int TOTAL_NUM_RECORDS = 100;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase$ValidatingSourceOutput.class */
    public static class ValidatingSourceOutput implements ReaderOutput<Integer> {
        private Set<Integer> consumedValues = new HashSet();
        private int max = Integer.MIN_VALUE;
        private int min = Integer.MAX_VALUE;
        private int count = 0;

        public void collect(Integer num) {
            this.max = Math.max(num.intValue(), this.max);
            this.min = Math.min(num.intValue(), this.min);
            this.count++;
            this.consumedValues.add(num);
        }

        public void collect(Integer num, long j) {
            collect(num);
        }

        public void validate() {
            Assert.assertEquals(String.format("Should be %d distinct elements in total", Integer.valueOf(SourceReaderTestBase.TOTAL_NUM_RECORDS)), 100L, this.consumedValues.size());
            Assert.assertEquals(String.format("Should be %d elements in total", Integer.valueOf(SourceReaderTestBase.TOTAL_NUM_RECORDS)), 100L, this.count);
            Assert.assertEquals("The min value should be 0", 0L, this.min);
            Assert.assertEquals("The max value should be 99", 99L, this.max);
        }

        public int count() {
            return this.count;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public SourceOutput<Integer> createOutputForSplit(String str) {
            return this;
        }

        public void releaseOutputForSplit(String str) {
        }
    }

    @After
    public void ensureNoDangling() {
        Iterator<Thread> it = Thread.getAllStackTraces().keySet().iterator();
        while (it.hasNext()) {
            if (it.next().getName().equals("SourceFetcher")) {
                System.out.println("Dangling thread.");
            }
        }
    }

    @Test
    public void testRead() throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        Throwable th = null;
        try {
            createReader.addSplits(getSplits(10, 10, Boundedness.BOUNDED));
            ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
            while (validatingSourceOutput.count < TOTAL_NUM_RECORDS) {
                createReader.pollNext(validatingSourceOutput);
            }
            validatingSourceOutput.validate();
            if (createReader != null) {
                if (0 == 0) {
                    createReader.close();
                    return;
                }
                try {
                    createReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                if (0 != 0) {
                    try {
                        createReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAddSplitToExistingFetcher() throws Exception {
        Thread.sleep(10L);
        ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
        SourceReader<Integer, SplitT> consumeRecords = consumeRecords(Collections.singletonList(getSplit(0, 10, Boundedness.BOUNDED)), validatingSourceOutput, 5);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                for (int i = 1; i < 10; i++) {
                    arrayList.add(getSplit(i, 10, Boundedness.BOUNDED));
                }
                consumeRecords.addSplits(arrayList);
                while (validatingSourceOutput.count() < TOTAL_NUM_RECORDS) {
                    consumeRecords.pollNext(validatingSourceOutput);
                }
                validatingSourceOutput.validate();
                if (consumeRecords != null) {
                    if (0 == 0) {
                        consumeRecords.close();
                        return;
                    }
                    try {
                        consumeRecords.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (consumeRecords != null) {
                if (th != null) {
                    try {
                        consumeRecords.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    consumeRecords.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 30000)
    public void testPollingFromEmptyQueue() throws Exception {
        ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
        SourceReader<Integer, SplitT> consumeRecords = consumeRecords(Collections.singletonList(getSplit(0, 10, Boundedness.BOUNDED)), validatingSourceOutput, 10);
        Throwable th = null;
        try {
            Assert.assertEquals("The status should be ", InputStatus.NOTHING_AVAILABLE, consumeRecords.pollNext(validatingSourceOutput));
            if (consumeRecords != null) {
                if (0 == 0) {
                    consumeRecords.close();
                    return;
                }
                try {
                    consumeRecords.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (consumeRecords != null) {
                if (0 != 0) {
                    try {
                        consumeRecords.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    consumeRecords.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 30000)
    public void testAvailableOnEmptyQueue() throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        Throwable th = null;
        try {
            CompletableFuture isAvailable = createReader.isAvailable();
            Assert.assertFalse("There should be no records ready for poll.", isAvailable.isDone());
            createReader.addSplits(Collections.singletonList(getSplit(0, 10, Boundedness.BOUNDED)));
            isAvailable.get();
            if (createReader != null) {
                if (0 == 0) {
                    createReader.close();
                    return;
                }
                try {
                    createReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                if (0 != 0) {
                    try {
                        createReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test(timeout = 30000)
    public void testSnapshot() throws Exception {
        SourceReader consumeRecords = consumeRecords(getSplits(10, 10, Boundedness.CONTINUOUS_UNBOUNDED), new ValidatingSourceOutput(), TOTAL_NUM_RECORDS);
        Throwable th = null;
        try {
            List snapshotState = consumeRecords.snapshotState(1L);
            Assert.assertEquals("The snapshot should only have 10 splits. ", 10L, snapshotState.size());
            for (int i = 0; i < 10; i++) {
                Assert.assertEquals("The first four splits should have been fully consumed.", 10L, getNextRecordIndex((SourceSplit) snapshotState.get(i)));
            }
            if (consumeRecords != null) {
                if (0 == 0) {
                    consumeRecords.close();
                    return;
                }
                try {
                    consumeRecords.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (consumeRecords != null) {
                if (0 != 0) {
                    try {
                        consumeRecords.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    consumeRecords.close();
                }
            }
            throw th3;
        }
    }

    protected abstract SourceReader<Integer, SplitT> createReader() throws Exception;

    protected abstract List<SplitT> getSplits(int i, int i2, Boundedness boundedness);

    protected abstract SplitT getSplit(int i, int i2, Boundedness boundedness);

    protected abstract long getNextRecordIndex(SplitT splitt);

    private SourceReader<Integer, SplitT> consumeRecords(List<SplitT> list, ValidatingSourceOutput validatingSourceOutput, int i) throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        createReader.addSplits(list);
        while (validatingSourceOutput.count() < i) {
            createReader.pollNext(validatingSourceOutput);
        }
        return createReader;
    }
}
