package org.apache.flink.connector.base.source.reader.synchronization;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.class */
public class FutureCompletingBlockingQueue<T> {
    public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
    private static final int DEFAULT_CAPACITY = 2;
    private final int capacity;
    private CompletableFuture<Void> currentFuture;
    private final Lock lock;

    @GuardedBy("lock")
    private final Queue<T> queue;

    @GuardedBy("lock")
    private final Queue<Condition> notFull;

    @GuardedBy("lock")
    private ConditionAndFlag[] putConditionAndFlags;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue$ConditionAndFlag.class */
    public static class ConditionAndFlag {
        private final Condition cond;
        private boolean wakeUp;

        private ConditionAndFlag(Condition condition) {
            this.cond = condition;
            this.wakeUp = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Condition condition() {
            return this.cond;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean getWakeUp() {
            return this.wakeUp;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setWakeUp(boolean z) {
            this.wakeUp = z;
        }
    }

    public FutureCompletingBlockingQueue() {
        this(2);
    }

    public FutureCompletingBlockingQueue(int i) {
        Preconditions.checkArgument(i > 0, "capacity must be > 0");
        this.capacity = i;
        this.queue = new ArrayDeque(i);
        this.lock = new ReentrantLock();
        this.putConditionAndFlags = new ConditionAndFlag[1];
        this.notFull = new ArrayDeque();
        this.currentFuture = new CompletableFuture<>();
    }

    public CompletableFuture<Void> getAvailabilityFuture() {
        return this.currentFuture;
    }

    public void notifyAvailable() {
        this.lock.lock();
        try {
            moveToAvailable();
        } finally {
            this.lock.unlock();
        }
    }

    @GuardedBy("lock")
    private void moveToAvailable() {
        CompletableFuture<Void> completableFuture = this.currentFuture;
        if (completableFuture != AVAILABLE) {
            this.currentFuture = AVAILABLE;
            completableFuture.complete(null);
        }
    }

    @GuardedBy("lock")
    private void moveToUnAvailable() {
        if (this.currentFuture == AVAILABLE) {
            this.currentFuture = new CompletableFuture<>();
        }
    }

    public boolean put(int i, T t) throws InterruptedException {
        if (t == null) {
            throw new NullPointerException();
        }
        this.lock.lockInterruptibly();
        while (this.queue.size() >= this.capacity) {
            try {
                if (getAndResetWakeUpFlag(i)) {
                    return false;
                }
                waitOnPut(i);
            } finally {
                this.lock.unlock();
            }
        }
        enqueue(t);
        this.lock.unlock();
        return true;
    }

    @VisibleForTesting
    public T take() throws InterruptedException {
        while (true) {
            T poll = poll();
            if (poll != null) {
                return poll;
            }
            try {
                getAvailabilityFuture().get();
            } catch (CompletionException | ExecutionException e) {
                throw new FlinkRuntimeException("exception in queue future completion", e);
            }
        }
    }

    public T poll() {
        this.lock.lock();
        try {
            if (this.queue.size() != 0) {
                return dequeue();
            }
            moveToUnAvailable();
            return null;
        } finally {
            this.lock.unlock();
        }
    }

    public T peek() {
        this.lock.lock();
        try {
            return this.queue.peek();
        } finally {
            this.lock.unlock();
        }
    }

    public int size() {
        this.lock.lock();
        try {
            return this.queue.size();
        } finally {
            this.lock.unlock();
        }
    }

    public boolean isEmpty() {
        this.lock.lock();
        try {
            return this.queue.isEmpty();
        } finally {
            this.lock.unlock();
        }
    }

    public int remainingCapacity() {
        this.lock.lock();
        try {
            return this.capacity - this.queue.size();
        } finally {
            this.lock.unlock();
        }
    }

    public void wakeUpPuttingThread(int i) {
        this.lock.lock();
        try {
            maybeCreateCondition(i);
            ConditionAndFlag conditionAndFlag = this.putConditionAndFlags[i];
            if (conditionAndFlag != null) {
                conditionAndFlag.setWakeUp(true);
                conditionAndFlag.condition().signal();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @GuardedBy("lock")
    private void enqueue(T t) {
        int size = this.queue.size();
        this.queue.add(t);
        if (size == 0) {
            moveToAvailable();
        }
        if (size >= this.capacity - 1 || this.notFull.isEmpty()) {
            return;
        }
        signalNextPutter();
    }

    @GuardedBy("lock")
    private T dequeue() {
        int size = this.queue.size();
        T poll = this.queue.poll();
        if (size == this.capacity && !this.notFull.isEmpty()) {
            signalNextPutter();
        }
        if (this.queue.isEmpty()) {
            moveToUnAvailable();
        }
        return poll;
    }

    @GuardedBy("lock")
    private void waitOnPut(int i) throws InterruptedException {
        maybeCreateCondition(i);
        Condition condition = this.putConditionAndFlags[i].condition();
        this.notFull.add(condition);
        condition.await();
    }

    @GuardedBy("lock")
    private void signalNextPutter() {
        if (this.notFull.isEmpty()) {
            return;
        }
        this.notFull.poll().signal();
    }

    @GuardedBy("lock")
    private void maybeCreateCondition(int i) {
        if (this.putConditionAndFlags.length < i + 1) {
            this.putConditionAndFlags = (ConditionAndFlag[]) Arrays.copyOf(this.putConditionAndFlags, i + 1);
        }
        if (this.putConditionAndFlags[i] == null) {
            this.putConditionAndFlags[i] = new ConditionAndFlag(this.lock.newCondition());
        }
    }

    @GuardedBy("lock")
    private boolean getAndResetWakeUpFlag(int i) {
        maybeCreateCondition(i);
        if (!this.putConditionAndFlags[i].getWakeUp()) {
            return false;
        }
        this.putConditionAndFlags[i].setWakeUp(false);
        return true;
    }

    private static CompletableFuture<Void> getAvailableFuture() {
        try {
            return (CompletableFuture) Class.forName("org.apache.flink.runtime.io.AvailabilityProvider").getDeclaredField("AVAILABLE").get(null);
        } catch (Throwable th) {
            return CompletableFuture.completedFuture(null);
        }
    }
}
