package org.apache.flink.connector.base.source.reader.synchronization;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.class */
public class FutureCompletingBlockingQueueTest {
    private static final int DEFAULT_CAPACITY = 2;

    @Test
    public void testBasics() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(5);
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        Assertions.assertThat(futureCompletingBlockingQueue.isEmpty()).isTrue();
        Assertions.assertThat(futureCompletingBlockingQueue.size()).isEqualTo(0);
        futureCompletingBlockingQueue.put(0, 1234);
        Assertions.assertThat(availabilityFuture.isDone()).isTrue();
        Assertions.assertThat(futureCompletingBlockingQueue.size()).isEqualTo(1);
        Assertions.assertThat(futureCompletingBlockingQueue.isEmpty()).isFalse();
        Assertions.assertThat(futureCompletingBlockingQueue.remainingCapacity()).isEqualTo(4);
        Assertions.assertThat((Integer) futureCompletingBlockingQueue.peek()).isNotNull();
        Assertions.assertThat(((Integer) futureCompletingBlockingQueue.peek()).intValue()).isEqualTo(1234);
        Assertions.assertThat(((Integer) futureCompletingBlockingQueue.poll()).intValue()).isEqualTo(1234);
        Assertions.assertThat(futureCompletingBlockingQueue.size()).isEqualTo(0);
        Assertions.assertThat(futureCompletingBlockingQueue.isEmpty()).isTrue();
        Assertions.assertThat(futureCompletingBlockingQueue.remainingCapacity()).isEqualTo(5);
    }

    @Test
    public void testPoll() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.put(0, 1234);
        Integer num = (Integer) futureCompletingBlockingQueue.poll();
        Assertions.assertThat(num).isNotNull();
        Assertions.assertThat(num.intValue()).isEqualTo(1234);
    }

    @Test
    public void testPollEmptyQueue() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.put(0, 1234);
        Assertions.assertThat((Integer) futureCompletingBlockingQueue.poll()).isNotNull();
        Assertions.assertThat((Integer) futureCompletingBlockingQueue.poll()).isNull();
        Assertions.assertThat((Integer) futureCompletingBlockingQueue.poll()).isNull();
    }

    @Test
    public void testWakeUpPut() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                Assertions.assertThat(futureCompletingBlockingQueue.put(0, 1234)).isTrue();
                Assertions.assertThat(futureCompletingBlockingQueue.put(0, 1234)).isFalse();
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                Assertions.fail("Interrupted unexpectedly.");
            }
        }).start();
        futureCompletingBlockingQueue.wakeUpPuttingThread(0);
        countDownLatch.await();
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void testConcurrency() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(5);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            int i2 = i;
            Thread thread = new Thread(() -> {
                for (int i3 = 0; i3 < 10000; i3++) {
                    try {
                        futureCompletingBlockingQueue.put(i2, Integer.valueOf((i2 * 10000) + i3));
                    } catch (InterruptedException e) {
                        Assertions.fail("putting thread interrupted.");
                    }
                }
            });
            thread.start();
            arrayList.add(thread);
        }
        BitSet bitSet = new BitSet();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i3 = 0; i3 < 5; i3++) {
            Thread thread2 = new Thread(() -> {
                while (atomicInteger.get() < 50000) {
                    Integer num = (Integer) futureCompletingBlockingQueue.poll();
                    if (num != null) {
                        atomicInteger.incrementAndGet();
                        if (bitSet.get(num.intValue())) {
                            Assertions.fail("Value " + num + " has been consumed before");
                        }
                        synchronized (bitSet) {
                            bitSet.set(num.intValue());
                        }
                    }
                }
            });
            thread2.start();
            arrayList.add(thread2);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
    }

    @Test
    public void testSpecifiedQueueCapacity() {
        Assertions.assertThat(new FutureCompletingBlockingQueue(8000).remainingCapacity()).isEqualTo(8000);
    }

    @Test
    public void testQueueDefaultCapacity() {
        Assertions.assertThat(new FutureCompletingBlockingQueue().remainingCapacity()).isEqualTo(DEFAULT_CAPACITY);
        Assertions.assertThat(((Integer) SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue()).intValue()).isEqualTo(DEFAULT_CAPACITY);
    }

    @Test
    public void testUnavailableWhenEmpty() {
        Assertions.assertThat(new FutureCompletingBlockingQueue().getAvailabilityFuture().isDone()).isFalse();
    }

    @Test
    public void testImmediatelyAvailableAfterPut() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.put(0, new Object());
        Assertions.assertThat(futureCompletingBlockingQueue.getAvailabilityFuture().isDone()).isTrue();
    }

    @Test
    public void testFutureBecomesAvailableAfterPut() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        futureCompletingBlockingQueue.put(0, new Object());
        Assertions.assertThat(availabilityFuture.isDone()).isTrue();
    }

    @Test
    public void testUnavailableWhenBecomesEmpty() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.put(0, new Object());
        futureCompletingBlockingQueue.poll();
        Assertions.assertThat(futureCompletingBlockingQueue.getAvailabilityFuture().isDone()).isFalse();
    }

    @Test
    public void testAvailableAfterNotifyAvailable() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.notifyAvailable();
        Assertions.assertThat(futureCompletingBlockingQueue.getAvailabilityFuture().isDone()).isTrue();
    }

    @Test
    public void testFutureBecomesAvailableAfterNotifyAvailable() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        futureCompletingBlockingQueue.notifyAvailable();
        Assertions.assertThat(availabilityFuture.isDone()).isTrue();
    }

    @Test
    public void testPollResetsAvailability() throws InterruptedException {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        futureCompletingBlockingQueue.notifyAvailable();
        CompletableFuture availabilityFuture = futureCompletingBlockingQueue.getAvailabilityFuture();
        futureCompletingBlockingQueue.poll();
        CompletableFuture availabilityFuture2 = futureCompletingBlockingQueue.getAvailabilityFuture();
        Assertions.assertThat(availabilityFuture.isDone()).isTrue();
        Assertions.assertThat(availabilityFuture2.isDone()).isFalse();
    }

    @Test
    public void testQueueUsesShortCircuitFuture() {
        Assertions.assertThat(FutureCompletingBlockingQueue.AVAILABLE).isSameAs(AvailabilityProvider.AVAILABLE);
    }
}
