package stream.io;

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;

/* loaded from: input_file:stream/io/DefaultBlockingQueue.class */
public class DefaultBlockingQueue implements Queue {
    static Logger log = LoggerFactory.getLogger((Class<?>) DefaultBlockingQueue.class);
    String id;
    final Data END_OF_STREAM = DataFactory.create();
    final AtomicBoolean closed = new AtomicBoolean(false);
    final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>();
    Object lock = new Object();

    @Override // stream.io.Barrel
    public int clear() {
        int size;
        synchronized (this.queue) {
            size = this.queue.size();
            this.queue.clear();
        }
        return size;
    }

    @Override // stream.io.Sink
    public String getId() {
        return this.id;
    }

    @Override // stream.io.Sink
    public void setId(String str) {
        this.id = str;
    }

    @Override // stream.io.Sink
    public void init() throws Exception {
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        log.trace("Receiving item {}", data);
        synchronized (this.closed) {
            if (this.closed.get()) {
                log.error("Attempt to write into closed queue '{}'!", this.id);
                return false;
            }
            log.trace("Adding data item to queue");
            boolean add = this.queue.add(data);
            if (add) {
                this.closed.notifyAll();
            } else {
                log.error("Failed to add item to queue: {}", data);
            }
            return add;
        }
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        synchronized (this.closed) {
            log.trace("Writing {} data items into queue", Integer.valueOf(collection.size()));
            if (this.closed.get()) {
                log.error("Attempt to write into closed queue '{}'!", this.id);
                return false;
            }
            return this.queue.addAll(collection);
        }
    }

    @Override // stream.io.Sink
    public void close() throws Exception {
        log.debug("Closing queue {}", toString());
        synchronized (this.closed) {
            this.closed.set(true);
            this.queue.add(this.END_OF_STREAM);
            this.closed.notifyAll();
        }
    }

    @Override // stream.io.Source
    public Data read() throws Exception {
        synchronized (this.closed) {
            if (this.closed.get() && this.queue.isEmpty()) {
                log.debug("queue '{}' closed, read() => null", getId());
                return null;
            }
            while (this.queue.isEmpty()) {
                log.trace("Waiting for new data to arrive {}", Thread.currentThread());
                this.closed.wait();
                if (this.closed.get()) {
                    log.debug("queue '{}' closed, read() => null", getId());
                    return null;
                }
            }
            log.trace("calling .take() on queue[closed={}] with {} elements", Boolean.valueOf(this.closed.get()), Integer.valueOf(this.queue.size()));
            Data take = this.queue.take();
            if (take != this.END_OF_STREAM) {
                return take;
            }
            log.debug("Found EOF!");
            return null;
        }
    }

    @Override // stream.io.Queue
    public void setCapacity(Integer num) {
    }

    @Override // stream.io.Queue
    public Integer getSize() {
        return Integer.valueOf(this.queue.size());
    }

    @Override // stream.io.Queue
    public Integer getCapacity() {
        return Integer.MAX_VALUE;
    }

    public String toString() {
        return super.toString() + "#['" + getId() + "']";
    }
}
