package stream.io;

import cern.colt.GenericSorting;
import cern.colt.Swapper;
import cern.colt.function.IntComparator;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/* loaded from: input_file:stream/io/SnappyJoin.class */
public class SnappyJoin extends Join {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SnappyJoin.class);
    protected AtomicBoolean closed;
    private int count;
    private int reads;
    private int read;
    private String[] readQueue;
    private Data[] dataQueue;
    private long[] accs;

    /* renamed from: streams, reason: collision with root package name */
    private Set<String> f10streams;
    private String index;
    private String sync;
    private Map<String, SnappyBlockingQueue> queues;
    private Swapper swapper;
    private IntComparator comp;
    private final ReentrantLock takeLock;

    public SnappyJoin() {
        this.closed = new AtomicBoolean(false);
        this.queues = new ConcurrentHashMap();
        this.swapper = new Swapper() { // from class: stream.io.SnappyJoin.1
            @Override // cern.colt.Swapper
            public void swap(int i, int i2) {
                long j = SnappyJoin.this.accs[i];
                SnappyJoin.this.accs[i] = SnappyJoin.this.accs[i2];
                SnappyJoin.this.accs[i2] = j;
                Data data = SnappyJoin.this.dataQueue[i];
                SnappyJoin.this.dataQueue[i] = SnappyJoin.this.dataQueue[i2];
                SnappyJoin.this.dataQueue[i2] = data;
                String str = SnappyJoin.this.readQueue[i];
                SnappyJoin.this.readQueue[i] = SnappyJoin.this.readQueue[i2];
                SnappyJoin.this.readQueue[i2] = str;
            }
        };
        this.comp = new IntComparator() { // from class: stream.io.SnappyJoin.2
            @Override // cern.colt.function.IntComparator
            public int compare(int i, int i2) {
                if (SnappyJoin.this.accs[i] == SnappyJoin.this.accs[i2]) {
                    return 0;
                }
                return SnappyJoin.this.accs[i] < SnappyJoin.this.accs[i2] ? -1 : 1;
            }
        };
        this.takeLock = new ReentrantLock();
        this.count = 0;
        this.read = 0;
    }

    public SnappyJoin(int i) {
        this();
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        this.capacity = i;
    }

    @Override // stream.io.Join
    public String getIndex() {
        return this.index;
    }

    @Override // stream.io.Join
    @Parameter(required = true, description = "the index key, e.g. a timestamp")
    public void setIndex(String str) {
        this.index = str;
    }

    @Override // stream.io.Join
    public String[] getStreams() {
        return (String[]) this.f10streams.toArray(new String[this.f10streams.size()]);
    }

    @Override // stream.io.Join
    @Parameter(required = true, description = "identifiers of the different streams (in Combination with sync key )")
    public void setStreams(String[] strArr) {
        this.f10streams = new HashSet();
        for (String str : strArr) {
            this.f10streams.add(str);
        }
    }

    @Override // stream.io.Join
    public String getSync() {
        return this.sync;
    }

    @Override // stream.io.Join
    @Parameter(required = true, description = "the sync key, e.g. @stream )")
    public void setSync(String str) {
        this.sync = str;
    }

    @Override // stream.io.Join
    public int size() {
        int i = Integer.MAX_VALUE;
        for (SnappyBlockingQueue snappyBlockingQueue : this.queues.values()) {
            if (i > snappyBlockingQueue.size()) {
                i = snappyBlockingQueue.size();
            }
        }
        return i;
    }

    @Override // stream.io.Join
    public int remainingCapacity() {
        return this.capacity - size();
    }

    @Override // stream.io.Join
    public boolean insert(Data data) {
        if (data == null || this.closed.get()) {
            return false;
        }
        Serializable serializable = data.get(this.sync);
        String str = null;
        if (serializable != null) {
            str = serializable.toString();
        }
        SnappyBlockingQueue snappyBlockingQueue = this.queues.get(str);
        if (snappyBlockingQueue == null) {
            return false;
        }
        try {
            snappyBlockingQueue.write(data);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return true;
        }
    }

    @Override // stream.io.Join, stream.io.Sink
    public void init() throws Exception {
        if (getCapacity().intValue() < 1) {
            throw new IllegalArgumentException("Invalid queue-capacity '" + getCapacity() + "'!");
        }
        if (this.index == null || this.index.isEmpty()) {
            throw new IllegalArgumentException("Index is not specified");
        }
        if (this.f10streams == null || this.f10streams.size() == 0) {
            throw new IllegalArgumentException("Index is not specified");
        }
        if (this.sync == null || this.sync.isEmpty()) {
            throw new IllegalArgumentException("Index is not specified");
        }
        Iterator<String> it = this.f10streams.iterator();
        while (it.hasNext()) {
            this.queues.put(it.next(), new SnappyBlockingQueue(this.capacity));
        }
        this.reads = this.f10streams.size();
        this.readQueue = new String[this.reads];
        this.dataQueue = new Data[this.reads];
        this.accs = new long[this.reads];
        int i = 0;
        Iterator<String> it2 = this.f10streams.iterator();
        while (it2.hasNext()) {
            this.readQueue[i] = it2.next();
            i++;
        }
    }

    @Override // stream.io.Join, stream.io.Sink
    public void close() throws Exception {
        log.debug("Closing queue '{}'...", getId());
        if (this.closed.get()) {
            log.debug("Queue '{}' already closed.", getId());
        } else {
            this.closed.getAndSet(true);
        }
    }

    @Override // stream.io.Join, stream.io.Source
    public Data read() throws Exception {
        log.trace("Reading from queue {}", getId());
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lockInterruptibly();
        try {
            if (this.closed.get() && this.count == 0) {
                log.debug("Queue '{}' is closed and empty => null", getId());
                reentrantLock.unlock();
                return null;
            }
            if (this.count == 0) {
                this.read = 0;
                for (int i = 0; i < this.reads; i++) {
                    this.dataQueue[i] = this.queues.get(this.readQueue[i]).read();
                    Serializable serializable = this.dataQueue[i].get(this.index);
                    if (serializable != null && (serializable instanceof Long)) {
                        this.accs[i] = ((Long) serializable).longValue();
                    }
                }
                GenericSorting.quickSort(0, this.f10streams.size(), this.comp, this.swapper);
                this.count = 0;
                for (int i2 = 1; i2 < this.f10streams.size() && this.accs[i2 - 1] == this.accs[i2]; i2++) {
                    this.count = i2;
                }
                this.count++;
                this.reads = this.count;
            }
            this.count--;
            Data data = this.dataQueue[this.read];
            this.read++;
            reentrantLock.unlock();
            return data;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // stream.io.Join, stream.io.QueueService
    public Data poll() {
        throw new IllegalAccessError("Not Implemented");
    }

    @Override // stream.io.Join, stream.io.QueueService
    public Data take() {
        try {
            return read();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override // stream.io.Join, stream.io.Sink
    public boolean write(Data data) throws Exception {
        return insert(data);
    }

    @Override // stream.io.Join, stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        throw new NotImplementedException();
    }

    @Override // stream.io.Join, stream.io.Barrel
    public int clear() {
        Iterator<SnappyBlockingQueue> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        return -1;
    }

    @Override // stream.io.Join, stream.io.QueueService
    public int level() {
        return size();
    }

    @Override // stream.io.Join, stream.io.QueueService
    public int capacity() {
        return this.capacity;
    }

    @Override // stream.io.Join, stream.io.Queue
    public Integer getSize() {
        return Integer.valueOf(size());
    }

    @Override // stream.io.Join, stream.service.Service
    public void reset() throws Exception {
    }

    @Override // stream.io.Join
    public String toString() {
        return "stream.io.Join['" + this.id + "']";
    }
}
