package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.ArrayList;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.class */
public class SplitFetcherTest {

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest$QueueDrainerThread.class */
    private static final class QueueDrainerThread extends CheckedThread {
        private final FutureCompletingBlockingQueue<?> queue;
        private final SplitFetcher<?, ?> fetcher;
        private final int numFetchesToTake;
        private volatile boolean wasIdleWhenFinished;

        QueueDrainerThread(FutureCompletingBlockingQueue<?> futureCompletingBlockingQueue, SplitFetcher<?, ?> splitFetcher, int i) {
            super("Queue Drainer");
            setPriority(10);
            this.queue = futureCompletingBlockingQueue;
            this.fetcher = splitFetcher;
            this.numFetchesToTake = i;
        }

        public void go() throws Exception {
            int i = this.numFetchesToTake;
            while (i > 0) {
                i--;
                this.queue.take();
            }
            this.wasIdleWhenFinished = this.fetcher.isIdle();
        }

        public boolean wasIdleWhenFinished() {
            return this.wasIdleWhenFinished;
        }
    }

    @Test
    public void testNewFetcherIsIdle() {
        Assert.assertTrue(createFetcher(new TestingSplitReader(new RecordsWithSplitIds[0])).isIdle());
    }

    @Test
    public void testFetcherNotIdleAfterSplitAdded() {
        SplitFetcher createFetcher = createFetcher(new TestingSplitReader(new RecordsWithSplitIds[0]));
        createFetcher.addSplits(Collections.singletonList(new TestingSourceSplit("test-split")));
        Assert.assertFalse(createFetcher.isIdle());
        while (createFetcher.assignedSplits().isEmpty()) {
            createFetcher.runOnce();
            Assert.assertFalse(createFetcher.isIdle());
        }
    }

    @Test
    public void testIdleAfterFinishedSplitsEnqueued() {
        SplitFetcher createFetcherWithSplit = createFetcherWithSplit("test-split", new TestingSplitReader(finishedSplitFetch("test-split")));
        createFetcherWithSplit.runOnce();
        Assert.assertTrue(createFetcherWithSplit.assignedSplits().isEmpty());
        Assert.assertTrue(createFetcherWithSplit.isIdle());
    }

    @Test
    public void testNotifiesWhenGoingIdle() {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        SplitFetcher createFetcherWithSplit = createFetcherWithSplit("test-split", futureCompletingBlockingQueue, new TestingSplitReader(finishedSplitFetch("test-split")));
        createFetcherWithSplit.runOnce();
        Assert.assertTrue(createFetcherWithSplit.assignedSplits().isEmpty());
        Assert.assertTrue(createFetcherWithSplit.isIdle());
        Assert.assertTrue(futureCompletingBlockingQueue.getAvailabilityFuture().isDone());
    }

    @Test
    public void testNotifiesOlderFutureWhenGoingIdle() {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        SplitFetcher createFetcherWithSplit = createFetcherWithSplit("test-split", futureCompletingBlockingQueue, new TestingSplitReader(finishedSplitFetch("test-split")));
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        createFetcherWithSplit.runOnce();
        Assert.assertTrue(createFetcherWithSplit.assignedSplits().isEmpty());
        Assert.assertTrue(createFetcherWithSplit.isIdle());
        Assert.assertTrue(availabilityFuture.isDone());
    }

    @Test
    public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        SplitFetcher createFetcherWithSplit = createFetcherWithSplit("test-split", futureCompletingBlockingQueue, new TestingSplitReader(finishedSplitFetch("test-split")));
        QueueDrainerThread queueDrainerThread = new QueueDrainerThread(futureCompletingBlockingQueue, createFetcherWithSplit, 1);
        queueDrainerThread.start();
        createFetcherWithSplit.runOnce();
        queueDrainerThread.sync();
        Assert.assertTrue(futureCompletingBlockingQueue.getAvailabilityFuture().isDone() || queueDrainerThread.wasIdleWhenFinished());
    }

    @Test
    public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        SplitFetcher createFetcherWithSplit = createFetcherWithSplit("test-split", futureCompletingBlockingQueue, new TestingSplitReader(finishedSplitFetch("test-split")));
        QueueDrainerThread queueDrainerThread = new QueueDrainerThread(futureCompletingBlockingQueue, createFetcherWithSplit, 1);
        queueDrainerThread.start();
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        createFetcherWithSplit.runOnce();
        Assert.assertTrue(availabilityFuture.isDone());
        queueDrainerThread.sync();
    }

    @Test
    public void testWakeup() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(1);
        final SplitFetcher splitFetcher = new SplitFetcher(0, futureCompletingBlockingQueue, MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true).build(), ExceptionUtils::rethrow, () -> {
        }, collection -> {
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            arrayList.add(new MockSourceSplit(i, 0, 10000));
            int i2 = i * 10000;
            for (int i3 = i2; i3 < i2 + 10000; i3++) {
                ((MockSourceSplit) arrayList.get(arrayList.size() - 1)).addRecord(i3);
            }
        }
        splitFetcher.addSplits(arrayList);
        Thread thread = new Thread((Runnable) splitFetcher, "FetcherThread");
        final SortedSet synchronizedSortedSet = Collections.synchronizedSortedSet(new TreeSet());
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread2 = new Thread("Wakeup Caller") { // from class: org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int i4 = 0;
                while (synchronizedSortedSet.size() < 30000 && !atomicBoolean.get()) {
                    int size = synchronizedSortedSet.size();
                    if (size >= i4 + 10) {
                        splitFetcher.wakeUp(false);
                        atomicInteger.incrementAndGet();
                        i4 = size;
                    }
                }
            }
        };
        try {
            thread.start();
            thread2.start();
            while (synchronizedSortedSet.size() < 30000) {
                RecordsWithSplitIds recordsWithSplitIds = (RecordsWithSplitIds) futureCompletingBlockingQueue.take();
                while (recordsWithSplitIds.nextSplit() != null) {
                    while (true) {
                        int[] iArr = (int[]) recordsWithSplitIds.nextRecordFromSplit();
                        if (iArr != null) {
                            Assert.assertTrue(synchronizedSortedSet.add(Integer.valueOf(iArr[0])));
                        }
                    }
                }
            }
            Assert.assertEquals(30000L, synchronizedSortedSet.size());
            Assert.assertEquals(0L, ((Integer) synchronizedSortedSet.first()).intValue());
            Assert.assertEquals(29999L, ((Integer) synchronizedSortedSet.last()).intValue());
            Assert.assertTrue(atomicInteger.get() > 0);
            atomicBoolean.set(true);
            splitFetcher.shutdown();
            thread.join();
            thread2.join();
        } catch (Throwable th) {
            atomicBoolean.set(true);
            splitFetcher.shutdown();
            thread.join();
            thread2.join();
            throw th;
        }
    }

    @Test
    public void testClose() {
        TestingSplitReader testingSplitReader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        SplitFetcher createFetcher = createFetcher(testingSplitReader);
        createFetcher.shutdown();
        createFetcher.run();
        Assert.assertTrue(testingSplitReader.isClosed());
    }

    private static <E> RecordsBySplits<E> finishedSplitFetch(String str) {
        return new RecordsBySplits<>(Collections.emptyMap(), Collections.singleton(str));
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(SplitReader<E, TestingSourceSplit> splitReader) {
        return createFetcher(splitReader, new FutureCompletingBlockingQueue());
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(SplitReader<E, TestingSourceSplit> splitReader, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue) {
        return new SplitFetcher<>(0, futureCompletingBlockingQueue, splitReader, ExceptionUtils::rethrow, () -> {
        }, collection -> {
        });
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(String str, SplitReader<E, TestingSourceSplit> splitReader) {
        return createFetcherWithSplit(str, new FutureCompletingBlockingQueue(), splitReader);
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(String str, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, SplitReader<E, TestingSourceSplit> splitReader) {
        SplitFetcher<E, TestingSourceSplit> createFetcher = createFetcher(splitReader, futureCompletingBlockingQueue);
        createFetcher.addSplits(Collections.singletonList(new TestingSourceSplit(str)));
        while (createFetcher.assignedSplits().isEmpty()) {
            createFetcher.runOnce();
        }
        return createFetcher;
    }
}
