package org.apache.pinot.core.operator.streaming;

import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
import org.apache.pinot.core.query.exception.EarlyTerminationException;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.class */
public class StreamingSelectionOnlyCombineOperator extends BaseOperator<IntermediateResultsBlock> {
    private static final Logger LOGGER;
    private static final String OPERATOR_NAME = "StreamingSelectionOnlyCombineOperator";
    private static final IntermediateResultsBlock LAST_RESULTS_BLOCK;
    private final List<Operator> _operators;
    private final QueryContext _queryContext;
    private final ExecutorService _executorService;
    private final long _endTimeMs;
    private final StreamObserver<Server.ServerResponse> _streamObserver;
    private final int _limit;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamingSelectionOnlyCombineOperator(List<Operator> list, QueryContext queryContext, ExecutorService executorService, long j, StreamObserver<Server.ServerResponse> streamObserver) {
        this._operators = list;
        this._queryContext = queryContext;
        this._executorService = executorService;
        this._endTimeMs = j;
        this._streamObserver = streamObserver;
        this._limit = queryContext.getLimit();
    }

    @Override // org.apache.pinot.core.operator.BaseOperator
    public String getOperatorName() {
        return OPERATOR_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.core.operator.BaseOperator
    public IntermediateResultsBlock getNextBlock() {
        final int size = this._operators.size();
        final int numThreadsForQuery = CombineOperatorUtils.getNumThreadsForQuery(size);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final Phaser phaser = new Phaser(1);
        Future[] futureArr = new Future[numThreadsForQuery];
        for (int i = 0; i < numThreadsForQuery; i++) {
            final int i2 = i;
            futureArr[i] = this._executorService.submit(new TraceRunnable() { // from class: org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator.1
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // org.apache.pinot.core.util.trace.TraceRunnable
                public void runJob() {
                    try {
                        if (phaser.register() < 0) {
                            phaser.arriveAndDeregister();
                            return;
                        }
                        int i3 = 0;
                        int i4 = i2;
                        while (i4 < size) {
                            Operator operator = (Operator) StreamingSelectionOnlyCombineOperator.this._operators.get(i4);
                            do {
                                try {
                                    IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
                                    if (intermediateResultsBlock != null) {
                                        Collection<Object[]> selectionResult = intermediateResultsBlock.getSelectionResult();
                                        if (!$assertionsDisabled && selectionResult == null) {
                                            throw new AssertionError();
                                        }
                                        i3 += selectionResult.size();
                                        linkedBlockingQueue.offer(intermediateResultsBlock);
                                    } else {
                                        linkedBlockingQueue.offer(StreamingSelectionOnlyCombineOperator.LAST_RESULTS_BLOCK);
                                        i4 += numThreadsForQuery;
                                    }
                                } catch (EarlyTerminationException e) {
                                    phaser.arriveAndDeregister();
                                    return;
                                } catch (Exception e2) {
                                    StreamingSelectionOnlyCombineOperator.LOGGER.error("Caught exception while executing operator of index: {} (query: {})", new Object[]{Integer.valueOf(i4), StreamingSelectionOnlyCombineOperator.this._queryContext, e2});
                                    linkedBlockingQueue.offer(new IntermediateResultsBlock(e2));
                                    phaser.arriveAndDeregister();
                                    return;
                                }
                            } while (i3 < StreamingSelectionOnlyCombineOperator.this._limit);
                            phaser.arriveAndDeregister();
                            return;
                        }
                        phaser.arriveAndDeregister();
                    } catch (Throwable th) {
                        phaser.arriveAndDeregister();
                        throw th;
                    }
                }

                static {
                    $assertionsDisabled = !StreamingSelectionOnlyCombineOperator.class.desiredAssertionStatus();
                }
            });
        }
        int i3 = 0;
        int i4 = 0;
        while (i3 < this._limit && i4 < size) {
            try {
                try {
                    IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) linkedBlockingQueue.poll(this._endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (intermediateResultsBlock == null) {
                        LOGGER.error("Timed out while polling results block (query: {})", this._queryContext);
                        IntermediateResultsBlock intermediateResultsBlock2 = new IntermediateResultsBlock((Exception) QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, new TimeoutException("Timed out while polling results block")));
                        for (Future future : futureArr) {
                            if (!future.isDone()) {
                                future.cancel(true);
                            }
                        }
                        phaser.awaitAdvance(phaser.arriveAndDeregister());
                        return intermediateResultsBlock2;
                    }
                    if (intermediateResultsBlock.getProcessingExceptions() != null) {
                        for (Future future2 : futureArr) {
                            if (!future2.isDone()) {
                                future2.cancel(true);
                            }
                        }
                        phaser.awaitAdvance(phaser.arriveAndDeregister());
                        return intermediateResultsBlock;
                    }
                    if (intermediateResultsBlock == LAST_RESULTS_BLOCK) {
                        i4++;
                    } else {
                        DataSchema dataSchema = intermediateResultsBlock.getDataSchema();
                        Collection<Object[]> selectionResult = intermediateResultsBlock.getSelectionResult();
                        if (!$assertionsDisabled && (dataSchema == null || selectionResult == null)) {
                            throw new AssertionError();
                        }
                        i3 += selectionResult.size();
                        this._streamObserver.onNext(StreamingResponseUtils.getDataResponse(SelectionOperatorUtils.getDataTableFromRows(selectionResult, dataSchema)));
                    }
                } catch (Exception e) {
                    LOGGER.error("Caught exception while streaming results blocks (query: {})", this._queryContext, e);
                    IntermediateResultsBlock intermediateResultsBlock3 = new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e);
                    for (Future future3 : futureArr) {
                        if (!future3.isDone()) {
                            future3.cancel(true);
                        }
                    }
                    phaser.awaitAdvance(phaser.arriveAndDeregister());
                    return intermediateResultsBlock3;
                }
            } catch (Throwable th) {
                for (Future future4 : futureArr) {
                    if (!future4.isDone()) {
                        future4.cancel(true);
                    }
                }
                phaser.awaitAdvance(phaser.arriveAndDeregister());
                throw th;
            }
        }
        IntermediateResultsBlock intermediateResultsBlock4 = new IntermediateResultsBlock();
        CombineOperatorUtils.setExecutionStatistics(intermediateResultsBlock4, this._operators);
        for (Future future5 : futureArr) {
            if (!future5.isDone()) {
                future5.cancel(true);
            }
        }
        phaser.awaitAdvance(phaser.arriveAndDeregister());
        return intermediateResultsBlock4;
    }

    static {
        $assertionsDisabled = !StreamingSelectionOnlyCombineOperator.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class);
        LAST_RESULTS_BLOCK = new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList());
    }
}
