package org.apache.hyracks.dataflow.std.collectors;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.BitSet;
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.channels.IInputChannelMonitor;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.class */
public class NonDeterministicChannelReader implements IInputChannelMonitor, IPartitionAcceptor {
    private static final Logger LOGGER;
    private final int nSenderPartitions;
    private final IInputChannel[] channels;
    private final BitSet frameAvailability;
    private final int[] availableFrameCounts;
    private final BitSet eosSenders;
    private final BitSet failSenders;
    private final BitSet closedSenders;
    private int lastReadSender;
    private boolean localFailure;
    static final /* synthetic */ boolean $assertionsDisabled;

    public NonDeterministicChannelReader(int i, BitSet bitSet) {
        this.nSenderPartitions = i;
        this.channels = new IInputChannel[i];
        this.eosSenders = new BitSet(i);
        this.failSenders = new BitSet(i);
        this.closedSenders = new BitSet(i);
        this.closedSenders.or(bitSet);
        this.closedSenders.flip(0, i);
        this.frameAvailability = new BitSet(i);
        this.availableFrameCounts = new int[i];
    }

    @Override // org.apache.hyracks.dataflow.std.collectors.IPartitionAcceptor
    public void addPartition(PartitionId partitionId, IInputChannel iInputChannel) {
        iInputChannel.registerMonitor(this);
        iInputChannel.setAttachment(partitionId);
        synchronized (this) {
            this.channels[partitionId.getSenderIndex()] = iInputChannel;
        }
    }

    public int getSenderPartitionCount() {
        return this.nSenderPartitions;
    }

    public synchronized ByteBuffer getNextBuffer(int i) throws HyracksDataException {
        while (this.availableFrameCounts[i] <= 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                throw HyracksDataException.create(e);
            }
        }
        int[] iArr = this.availableFrameCounts;
        int i2 = iArr[i] - 1;
        iArr[i] = i2;
        if (i2 == 0) {
            this.frameAvailability.clear(i);
        }
        return this.channels[i].getNextBuffer();
    }

    public void recycleBuffer(int i, ByteBuffer byteBuffer) {
        this.channels[i].recycleBuffer(byteBuffer);
    }

    public void open() throws HyracksDataException {
        this.lastReadSender = -1;
    }

    public synchronized int findNextSender() throws HyracksDataException {
        while (true) {
            this.lastReadSender = this.frameAvailability.nextSetBit(this.lastReadSender + 1);
            if (this.lastReadSender < 0) {
                this.lastReadSender = this.frameAvailability.nextSetBit(0);
            }
            if (this.lastReadSender >= 0) {
                if ($assertionsDisabled || this.availableFrameCounts[this.lastReadSender] > 0) {
                    return this.lastReadSender;
                }
                throw new AssertionError();
            }
            if (!this.failSenders.isEmpty()) {
                LOGGER.warn("Sender failed.. returning silently");
                if (this.localFailure) {
                    throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR, new Serializable[0]);
                }
                return -1;
            }
            int nextSetBit = this.eosSenders.nextSetBit(0);
            while (true) {
                int i = nextSetBit;
                if (i < 0) {
                    break;
                }
                this.channels[i].close();
                this.eosSenders.clear(i);
                this.closedSenders.set(i);
                nextSetBit = this.eosSenders.nextSetBit(i);
            }
            int nextClearBit = this.closedSenders.nextClearBit(0);
            if (nextClearBit < 0 || nextClearBit >= this.nSenderPartitions) {
                break;
            }
            try {
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw HyracksDataException.create(e);
            }
        }
        this.lastReadSender = -1;
        return this.lastReadSender;
    }

    public synchronized void close() throws HyracksDataException {
        int nextClearBit = this.closedSenders.nextClearBit(0);
        while (true) {
            int i = nextClearBit;
            if (i < 0 || i >= this.nSenderPartitions) {
                return;
            }
            if (this.channels[i] != null) {
                this.channels[i].close();
                this.channels[i] = null;
            }
            nextClearBit = this.closedSenders.nextClearBit(i + 1);
        }
    }

    public synchronized void notifyFailure(IInputChannel iInputChannel, int i) {
        PartitionId partitionId = (PartitionId) iInputChannel.getAttachment();
        int senderIndex = partitionId.getSenderIndex();
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Failure: " + partitionId.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: " + partitionId.getReceiverIndex());
        }
        this.localFailure = i == -1;
        this.failSenders.set(senderIndex);
        this.eosSenders.set(senderIndex);
        notifyAll();
    }

    public synchronized void notifyDataAvailability(IInputChannel iInputChannel, int i) {
        PartitionId partitionId = (PartitionId) iInputChannel.getAttachment();
        int senderIndex = partitionId.getSenderIndex();
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Data available: " + partitionId.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: " + partitionId.getReceiverIndex());
        }
        int[] iArr = this.availableFrameCounts;
        iArr[senderIndex] = iArr[senderIndex] + i;
        this.frameAvailability.set(senderIndex);
        notifyAll();
    }

    public synchronized void notifyEndOfStream(IInputChannel iInputChannel) {
        PartitionId partitionId = (PartitionId) iInputChannel.getAttachment();
        int senderIndex = partitionId.getSenderIndex();
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("EOS: " + partitionId);
        }
        this.eosSenders.set(senderIndex);
        notifyAll();
    }

    static {
        $assertionsDisabled = !NonDeterministicChannelReader.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger();
    }
}
