package io.dingodb.exec.operator;

import io.dingodb.common.log.LogUtils;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.profile.OperatorProfile;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.fin.Fin;
import io.dingodb.exec.fin.FinWithException;
import io.dingodb.exec.fin.FinWithProfiles;
import io.dingodb.exec.operator.data.Context;
import io.dingodb.exec.operator.params.ReceiveParam;
import io.dingodb.exec.tuple.TupleId;
import io.dingodb.exec.utils.QueueUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/exec/operator/ReceiveOperator.class */
public final class ReceiveOperator extends SourceOperator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReceiveOperator.class);
    public static final ReceiveOperator INSTANCE = new ReceiveOperator();

    private ReceiveOperator() {
    }

    @Override // io.dingodb.exec.operator.SourceOperator, io.dingodb.exec.base.Operator
    public void fin(int i, Fin fin, Vertex vertex) {
        Fin finObj = ((ReceiveParam) vertex.getParam()).getFinObj();
        if (finObj instanceof FinWithException) {
            super.fin(i, finObj, vertex);
            return;
        }
        if (fin instanceof FinWithProfiles) {
            ((FinWithProfiles) fin).addProfile(vertex);
        }
        super.fin(i, fin, vertex);
    }

    @Override // io.dingodb.exec.operator.SourceOperator
    public boolean push(Context context, Vertex vertex) {
        Object[] tuple;
        ReceiveParam receiveParam = (ReceiveParam) vertex.getParam();
        long j = 0;
        OperatorProfile profile = receiveParam.getProfile("receive");
        profile.start();
        while (true) {
            TupleId tupleId = (TupleId) QueueUtils.forceTake(receiveParam.getTupleQueue());
            tuple = tupleId.getTuple();
            if (tuple[0] instanceof Fin) {
                break;
            }
            RangeDistribution rangeDistribution = null;
            if (tupleId.getPartId() != null) {
                rangeDistribution = RangeDistribution.builder().id(tupleId.getPartId()).build();
            }
            if (tupleId.getIndexId() != null) {
                context.setIndexId(tupleId.getIndexId());
            }
            j++;
            LogUtils.debug(log, "(tag = {}) Take out tuple {} from receiving queue.", receiveParam.getTag(), receiveParam.getSchema().format(tuple));
            context.setDistribution(rangeDistribution);
            if (!vertex.getSoleEdge().transformToNext(context, tuple)) {
                receiveParam.getEndpoint().stop();
            }
        }
        LogUtils.debug(log, "(tag = {}) Take out FIN.", receiveParam.getTag());
        profile.setCount(j);
        Fin fin = (Fin) tuple[0];
        if (fin instanceof FinWithProfiles) {
            ((FinWithProfiles) fin).addProfile(profile);
            return false;
        }
        if (!(fin instanceof FinWithException)) {
            return false;
        }
        receiveParam.setFinObj(fin);
        return false;
    }
}
