package org.apache.flink.connector.base.source.reader.fetcher;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Queue;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.class */
public class SplitFetcherManagerTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest$AwaitingReader.class */
    public static final class AwaitingReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> {
        private final Queue<RecordsWithSplitIds<E>> fetches;
        private final IOException testError;
        private final OneShotLatch inBlocking = new OneShotLatch();
        private final OneShotLatch throwError = new OneShotLatch();

        @SafeVarargs
        AwaitingReader(IOException iOException, RecordsWithSplitIds<E>... recordsWithSplitIdsArr) {
            this.testError = iOException;
            this.fetches = new ArrayDeque(Arrays.asList(recordsWithSplitIdsArr));
        }

        public RecordsWithSplitIds<E> fetch() throws IOException {
            if (!this.fetches.isEmpty()) {
                return this.fetches.poll();
            }
            this.inBlocking.trigger();
            try {
                this.throwError.await();
                throw this.testError;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("interrupted");
            }
        }

        public void handleSplitsChanges(SplitsChange<SplitT> splitsChange) {
        }

        public void wakeUp() {
        }

        public void close() throws Exception {
        }

        public void awaitAllRecordsReturned() throws InterruptedException {
            this.inBlocking.await();
        }

        public void triggerThrowException() {
            this.throwError.trigger();
        }
    }

    @Test
    public void testExceptionPropagationFirstFetch() throws Exception {
        testExceptionPropagation(new RecordsWithSplitIds[0]);
    }

    @Test
    public void testExceptionPropagationSuccessiveFetch() throws Exception {
        testExceptionPropagation(new TestingRecordsWithSplitIds("testSplit", 1, 2, 3, 4), new TestingRecordsWithSplitIds("testSplit", 5, 6, 7, 8));
    }

    @Test
    public void testCloseFetcherWithException() throws Exception {
        TestingSplitReader testingSplitReader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        testingSplitReader.setCloseWithException();
        SplitFetcherManager createFetcher = createFetcher("test-split", new FutureCompletingBlockingQueue(), testingSplitReader);
        createFetcher.close(1000L);
        try {
            createFetcher.checkErrors();
        } catch (Exception e) {
            Assert.assertEquals("Artificial exception on closing the split reader.", ExceptionUtils.getRootCause(e).getMessage());
        }
    }

    @SafeVarargs
    private final void testExceptionPropagation(RecordsWithSplitIds<Integer>... recordsWithSplitIdsArr) throws Exception {
        IOException iOException = new IOException("test");
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(10);
        AwaitingReader awaitingReader = new AwaitingReader(iOException, recordsWithSplitIdsArr);
        SplitFetcherManager createFetcher = createFetcher("testSplit", futureCompletingBlockingQueue, awaitingReader);
        awaitingReader.awaitAllRecordsReturned();
        drainQueue(futureCompletingBlockingQueue);
        Assert.assertFalse(futureCompletingBlockingQueue.getAvailabilityFuture().isDone());
        awaitingReader.triggerThrowException();
        futureCompletingBlockingQueue.getAvailabilityFuture().get();
        try {
            try {
                createFetcher.checkErrors();
                Assert.fail("expected exception");
                createFetcher.close(20000L);
            } catch (Exception e) {
                Assert.assertSame(iOException, e.getCause().getCause());
                createFetcher.close(20000L);
            }
        } catch (Throwable th) {
            createFetcher.close(20000L);
            throw th;
        }
    }

    private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(String str, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, SplitReader<E, TestingSourceSplit> splitReader) {
        SingleThreadFetcherManager singleThreadFetcherManager = new SingleThreadFetcherManager(futureCompletingBlockingQueue, () -> {
            return splitReader;
        });
        singleThreadFetcherManager.addSplits(Collections.singletonList(new TestingSourceSplit(str)));
        return singleThreadFetcherManager;
    }

    private static void drainQueue(FutureCompletingBlockingQueue<?> futureCompletingBlockingQueue) {
        do {
        } while (futureCompletingBlockingQueue.poll() != null);
    }
}
