/*
 * Decompiled with CFR 0.152.
 */
package io.nflow.engine.internal.executor;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;

public class ThresholdBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E> {
    private final LinkedBlockingQueue<E> queue;
    private final int notifyThreshHold;

    public ThresholdBlockingQueue(int capacity, int notifyThreshHold) {
        this.notifyThreshHold = notifyThreshHold;
        this.queue = new LinkedBlockingQueue(capacity);
    }

    public synchronized void notifyIfNotFull() {
        if (this.queue.size() <= this.notifyThreshHold) {
            this.notifyAll();
        }
    }

    public synchronized void waitUntilQueueSizeLowerThanThreshold(DateTime waitUntil) throws InterruptedException {
        long sleep;
        while (this.queue.size() > this.notifyThreshHold && (sleep = waitUntil.getMillis() - DateTimeUtils.currentTimeMillis()) > 0L) {
            this.wait(sleep);
        }
    }

    @Override
    public boolean offer(E e) {
        return this.queue.offer(e);
    }

    @Override
    public E poll() {
        E o = this.queue.poll();
        this.notifyIfNotFull();
        return o;
    }

    @Override
    public E peek() {
        return this.queue.peek();
    }

    @Override
    public Iterator<E> iterator() {
        return this.queue.iterator();
    }

    @Override
    public int size() {
        return this.queue.size();
    }

    @Override
    public void put(E e) throws InterruptedException {
        this.queue.put(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return this.queue.offer(e, timeout, unit);
    }

    @Override
    public E take() throws InterruptedException {
        E o = this.queue.take();
        this.notifyIfNotFull();
        return o;
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E o = this.queue.poll(timeout, unit);
        this.notifyIfNotFull();
        return o;
    }

    @Override
    public int remainingCapacity() {
        return this.queue.remainingCapacity();
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        int count = this.queue.drainTo(c);
        this.notifyIfNotFull();
        return count;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        int count = this.queue.drainTo(c, maxElements);
        this.notifyIfNotFull();
        return count;
    }
}

