package stream.io;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;

/* loaded from: input_file:stream/io/DefaultQueue.class */
public class DefaultQueue extends AbstractQueue {
    static Logger log = LoggerFactory.getLogger((Class<?>) DefaultQueue.class);
    LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>();
    final AtomicBoolean closed = new AtomicBoolean(false);
    int size = 100000;

    @Override // stream.io.Queue
    public Integer getSize() {
        return Integer.valueOf(this.queue.size());
    }

    @Override // stream.io.Barrel
    public int clear() {
        int size = this.queue.size();
        this.queue.clear();
        return size;
    }

    @Override // stream.io.Sink
    public void init() throws Exception {
        log.debug("Creating blocking queue of size {}", Integer.valueOf(this.size));
        this.queue = new LinkedBlockingQueue<>(this.size);
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        while (!this.queue.offer(data)) {
            try {
                log.debug("Failed to insert into queue... thread yielding");
                Thread.yield();
            } catch (Exception e) {
            }
        }
        log.debug("item inserted.");
        return true;
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        Iterator<Data> it = collection.iterator();
        while (it.hasNext()) {
            if (!write(it.next())) {
                return false;
            }
        }
        return true;
    }

    @Override // stream.io.Sink
    public void close() throws Exception {
        this.queue.clear();
        this.closed.set(false);
    }

    @Override // stream.io.Source
    public Data read() throws Exception {
        Data take;
        while (!this.closed.get()) {
            try {
                take = this.queue.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (take != null) {
                return take;
            }
        }
        return null;
    }

    @Override // stream.io.QueueService
    public Data poll() {
        return this.queue.poll();
    }

    @Override // stream.io.QueueService
    public Data take() {
        try {
            return read();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override // stream.io.QueueService
    public boolean enqueue(Data data) {
        return this.queue.add(data);
    }

    @Override // stream.io.QueueService
    public int level() {
        return this.queue.size();
    }

    @Override // stream.io.QueueService
    public int capacity() {
        return this.queue.remainingCapacity();
    }

    @Override // stream.service.Service
    public void reset() throws Exception {
        this.queue.clear();
    }
}
