package io.dingodb.exec.operator;

import io.dingodb.common.log.LogUtils;
import io.dingodb.exec.channel.SendEndpoint;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.fin.Fin;
import io.dingodb.exec.fin.FinWithException;
import io.dingodb.exec.operator.data.Context;
import io.dingodb.exec.operator.params.SendParam;
import io.dingodb.exec.tuple.TupleId;
import io.dingodb.net.BufferOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/exec/operator/SendOperator.class */
public final class SendOperator extends SinkOperator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SendOperator.class);
    public static final SendOperator INSTANCE = new SendOperator();
    public static final int SEND_BATCH_SIZE = 256;

    private SendOperator() {
    }

    @Override // io.dingodb.exec.base.Operator
    public boolean push(Context context, Object[] objArr, Vertex vertex) {
        synchronized (vertex) {
            try {
                SendParam sendParam = (SendParam) vertex.getParam();
                TupleId.TupleIdBuilder builder = TupleId.builder();
                if (context != null && context.getDistribution() != null) {
                    builder.partId(context.getDistribution().getId()).tuple(objArr).indexId(context.getIndexId());
                }
                sendParam.getTupleList().add(builder.tuple(objArr).build());
                if (sendParam.getTupleList().size() < 256) {
                    return true;
                }
                return sendTupleList(sendParam);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // io.dingodb.exec.operator.SinkOperator
    public void fin(Fin fin, Vertex vertex) {
        try {
            SendParam sendParam = (SendParam) vertex.getParam();
            SendEndpoint endpoint = sendParam.getEndpoint();
            BufferOutputStream outputStream = endpoint.getOutputStream(sendParam.getMaxBufferSize());
            sendParam.getCodec().encodeFin(outputStream, fin);
            if (!(fin instanceof FinWithException)) {
                sendTupleList(sendParam);
            }
            LogUtils.debug(log, "Send FIN with detail:\n{}", fin.detail());
            endpoint.send(outputStream, true);
        } catch (IOException e) {
            LogUtils.error(log, "Encode FIN failed. fin = {}", fin, e);
        }
    }

    private boolean sendTupleList(SendParam sendParam) throws IOException {
        SendEndpoint endpoint = sendParam.getEndpoint();
        int maxBufferSize = sendParam.getMaxBufferSize();
        List<TupleId> tupleList = sendParam.getTupleList();
        if (tupleList.isEmpty()) {
            return true;
        }
        BufferOutputStream outputStream = endpoint.getOutputStream(maxBufferSize);
        if (tupleList.get(0).getPartId() != null) {
            sendParam.getCodec().encodeTupleIds(outputStream, tupleList);
        } else {
            sendParam.getCodec().encodeTuples(outputStream, (List) tupleList.stream().map((v0) -> {
                return v0.getTuple();
            }).collect(Collectors.toList()));
        }
        if (outputStream.bytes() > maxBufferSize) {
            sendParam.setMaxBufferSize(outputStream.bytes());
        }
        boolean send = endpoint.send(outputStream);
        tupleList.clear();
        return send;
    }
}
