package org.apache.pinot.query.runtime.operator;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.class */
public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxReceiveOperator.class);
    private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
    private final MailboxService<Mailbox.MailboxContent> _mailboxService;
    private final RelDistribution.Type _exchangeType;
    private final KeySelector<Object[], Object[]> _keySelector;
    private final List<ServerInstance> _sendingStageInstances;
    private final DataSchema _dataSchema;
    private final String _hostName;
    private final int _port;
    private final long _jobId;
    private final int _stageId;
    private final long _timeout;
    private TransferableBlock _upstreamErrorBlock;

    public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataSchema dataSchema, List<ServerInstance> list, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, String str, int i, long j, int i2) {
        this._dataSchema = dataSchema;
        this._mailboxService = mailboxService;
        this._exchangeType = type;
        if (this._exchangeType == RelDistribution.Type.SINGLETON) {
            ServerInstance serverInstance = null;
            for (ServerInstance serverInstance2 : list) {
                if (serverInstance2.getHostname().equals(this._mailboxService.getHostname()) && serverInstance2.getQueryMailboxPort() == this._mailboxService.getMailboxPort()) {
                    Preconditions.checkState(serverInstance == null, "multiple instance found for singleton exchange type!");
                    serverInstance = serverInstance2;
                }
            }
            this._sendingStageInstances = Collections.singletonList(serverInstance);
        } else {
            this._sendingStageInstances = list;
        }
        this._hostName = str;
        this._port = i;
        this._jobId = j;
        this._stageId = i2;
        this._timeout = QueryConfig.DEFAULT_TIMEOUT_NANO;
        this._upstreamErrorBlock = null;
        this._keySelector = keySelector;
    }

    @Override // org.apache.pinot.core.common.Operator
    public List<Operator> getChildOperators() {
        return null;
    }

    @Override // org.apache.pinot.core.common.Operator
    @Nullable
    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.core.operator.BaseOperator
    /* renamed from: getNextBlock */
    public TransferableBlock getNextBlock2() {
        if (this._upstreamErrorBlock != null) {
            return this._upstreamErrorBlock;
        }
        boolean z = true;
        long nanoTime = System.nanoTime() + this._timeout;
        while (z && System.nanoTime() < nanoTime) {
            z = false;
            for (ServerInstance serverInstance : this._sendingStageInstances) {
                try {
                    ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox = this._mailboxService.getReceivingMailbox(toMailboxId(serverInstance));
                    if (receivingMailbox.isClosed()) {
                        continue;
                    } else {
                        z = true;
                        Mailbox.MailboxContent receive = receivingMailbox.receive();
                        if (receive != null) {
                            ByteBuffer asReadOnlyByteBuffer = receive.getPayload().asReadOnlyByteBuffer();
                            if (asReadOnlyByteBuffer.hasRemaining()) {
                                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(asReadOnlyByteBuffer);
                                if ((dataBlock instanceof MetadataBlock) && !dataBlock.getExceptions().isEmpty()) {
                                    this._upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
                                    return this._upstreamErrorBlock;
                                }
                                if (dataBlock.getNumberOfRows() > 0) {
                                    return new TransferableBlock(dataBlock);
                                }
                            } else {
                                continue;
                            }
                        } else {
                            continue;
                        }
                    }
                } catch (Exception e) {
                    LOGGER.error(String.format("Error receiving data from mailbox %s", serverInstance), (Throwable) e);
                }
            }
        }
        if (System.nanoTime() < nanoTime) {
            return TransferableBlockUtils.getEndOfStreamTransferableBlock(this._dataSchema);
        }
        LOGGER.error("Timed out after polling mailboxes: {}", this._sendingStageInstances);
        return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
    }

    public RelDistribution.Type getExchangeType() {
        return this._exchangeType;
    }

    private String toMailboxId(ServerInstance serverInstance) {
        return new StringMailboxIdentifier(String.format("%s_%s", Long.valueOf(this._jobId), Integer.valueOf(this._stageId)), serverInstance.getHostname(), serverInstance.getQueryMailboxPort(), this._hostName, this._port).toString();
    }
}
