package io.dingodb.exec.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.dingodb.exec.Services;
import io.dingodb.exec.channel.message.Control;
import io.dingodb.exec.channel.message.IncreaseBuffer;
import io.dingodb.exec.channel.message.StopTx;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.MessageListener;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/exec/channel/ReceiveEndpoint.class */
public class ReceiveEndpoint {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReceiveEndpoint.class);
    private static final int BUFFER_LENGTH = 589824;
    private final String host;
    private final int port;
    private final String tag;
    private final Consumer<byte[]> handler;
    private boolean stopped;
    private Channel channel;
    private ReceiveMessageListener messageListener;

    /* loaded from: input_file:io/dingodb/exec/channel/ReceiveEndpoint$ReceiveMessageListener.class */
    private class ReceiveMessageListener implements MessageListener {
        private ReceiveMessageListener() {
        }

        @Override // io.dingodb.net.MessageListener
        public void onMessage(Message message, Channel channel) {
            ReceiveEndpoint.this.sendIncreaseBuffer(message.length());
            ReceiveEndpoint.this.handler.accept(message.content());
        }
    }

    public ReceiveEndpoint(String str, int i, String str2, Consumer<byte[]> consumer) {
        this.host = str;
        this.port = i;
        this.tag = str2;
        this.handler = consumer;
    }

    public void init() {
        this.channel = Services.openNewSysChannel(this.host, this.port);
        if (log.isDebugEnabled()) {
            log.debug("(tag = {}) Opened channel to {}:{}.", this.tag, this.host, Integer.valueOf(this.port));
        }
        this.messageListener = new ReceiveMessageListener();
        Services.NET.registerTagMessageListener(this.tag, this.messageListener);
        this.stopped = false;
        sendIncreaseBuffer(589824);
    }

    public void stop() {
        sendStopTx();
        this.stopped = true;
    }

    public void close() {
        Services.NET.unregisterTagMessageListener(this.tag, this.messageListener);
        this.channel.close();
        if (log.isDebugEnabled()) {
            log.debug("(tag = {}) Closed channel to {}:{}.", this.tag, this.host, Integer.valueOf(this.port));
        }
    }

    private void sendStopTx() {
        sendControl(new StopTx(this.tag));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendIncreaseBuffer(int i) {
        sendControl(new IncreaseBuffer(this.tag, i));
    }

    private void sendControl(Control control) {
        try {
            this.channel.send(new Message(Services.CTRL_TAG, control.toBytes()), false);
            if (log.isDebugEnabled()) {
                log.debug("(tag = {}) Sent control message \"{}\".", this.tag, control);
            }
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize control message: {}", control);
            throw new RuntimeException("Failed to serialize control message.", e);
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }
}
