package io.dingodb.net.netty.channel;

import io.dingodb.common.Location;
import io.dingodb.common.codec.PrimitiveCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.concurrent.LinkedRunner;
import io.dingodb.common.util.Parameters;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.MessageListener;
import io.dingodb.net.netty.api.ApiRegistryImpl;
import io.dingodb.net.netty.connection.Connection;
import io.dingodb.net.netty.handler.TagMessageHandler;
import io.dingodb.net.netty.packet.Command;
import io.dingodb.net.netty.packet.Type;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/net/netty/channel/Channel.class */
public class Channel implements io.dingodb.net.Channel {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Channel.class);
    private static final long WAIT_THREAD_TIME = TimeUnit.MILLISECONDS.toNanos(2);
    private static final ApiRegistryImpl API_REGISTRY = ApiRegistryImpl.instance();
    private static final MessageListener EMPTY_MESSAGE_LISTENER = (message, channel) -> {
        log.warn("Receive message, but listener is empty.");
    };
    private static final Consumer<io.dingodb.net.Channel> EMPTY_CLOSE_LISTENER = channel -> {
    };
    protected final long channelId;
    protected final Connection connection;
    protected final Consumer<Long> onClose;
    protected LinkedRunner runner;
    private int closeRetry = 300;
    private Consumer<ByteBuffer> directListener = null;
    private MessageListener messageListener = null;
    private Consumer<io.dingodb.net.Channel> closeListener = EMPTY_CLOSE_LISTENER;
    protected Channel.Status status = Channel.Status.ACTIVE;

    public Channel(long j, Connection connection, LinkedRunner linkedRunner, Consumer<Long> consumer) {
        this.channelId = j;
        this.connection = connection;
        this.onClose = consumer;
        this.runner = linkedRunner;
    }

    public ByteBuf buffer(Type type, int i) {
        int i2 = i + 8 + 1;
        return this.connection.alloc().buffer(i2 + 4, i2 + 4).writeInt(i2).writeLong(this.channelId).writeByte(type.ordinal());
    }

    @Override // io.dingodb.net.Channel, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.status == Channel.Status.CLOSE) {
            log.warn("Channel [{}] already close", Long.valueOf(this.channelId));
            return;
        }
        shutdown();
        try {
            sendAsync(buffer(Type.COMMAND, 1).writeByte(Command.CLOSE.code()));
        } catch (Exception e) {
            log.error("Send close message error.", (Throwable) e);
        }
    }

    public synchronized void shutdown() {
        if (this.status == Channel.Status.CLOSE) {
            return;
        }
        this.status = Channel.Status.CLOSE;
        this.runner.forceFollow(() -> {
            this.onClose.accept(Long.valueOf(this.channelId));
        });
        this.runner.forceFollow(() -> {
            this.closeListener.accept(this);
        });
        this.runner = null;
    }

    @Override // io.dingodb.net.Channel
    public synchronized void setMessageListener(MessageListener messageListener) {
        this.messageListener = (MessageListener) Parameters.cleanNull(messageListener, EMPTY_MESSAGE_LISTENER);
    }

    @Override // io.dingodb.net.Channel
    public synchronized void setCloseListener(Consumer<io.dingodb.net.Channel> consumer) {
        if (isClosed()) {
            this.runner.forceFollow(() -> {
                this.closeListener.accept(this);
            });
        } else {
            this.closeListener = (Consumer) Parameters.cleanNull(consumer, EMPTY_CLOSE_LISTENER);
        }
    }

    @Override // io.dingodb.net.Channel
    public Location localLocation() {
        return this.connection.localLocation();
    }

    @Override // io.dingodb.net.Channel
    public Location remoteLocation() {
        return this.connection.remoteLocation();
    }

    @Override // io.dingodb.net.Channel
    public void send(Message message) {
        send(message, false);
    }

    @Override // io.dingodb.net.Channel
    public void send(Message message, boolean z) {
        if (isClosed()) {
            throw new RuntimeException("The channel is closed");
        }
        byte[] encode = message.encode();
        if (log.isTraceEnabled()) {
            log.trace("Send message to [{}] on [{}].", remoteLocation().getUrl(), Long.valueOf(this.channelId));
        }
        if (z) {
            try {
                send(buffer(Type.USER_DEFINE, encode.length).writeBytes(encode));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            try {
                sendAsync(buffer(Type.USER_DEFINE, encode.length).writeBytes(encode));
            } catch (Exception e2) {
                log.error("Send message to {} on {} error.", remoteLocation().getUrl(), Long.valueOf(this.channelId), e2);
            }
        }
    }

    public void send(ByteBuf byteBuf) throws InterruptedException {
        this.connection.send(byteBuf);
    }

    public void sendAsync(ByteBuf byteBuf) {
        this.connection.sendAsync(byteBuf);
    }

    public void receive(ByteBuffer byteBuffer) {
        if (this.status != Channel.Status.ACTIVE || this.runner.follow(() -> {
            processMessage(byteBuffer);
        })) {
            return;
        }
        log.error("Channel [{}] concurrent receive.", Long.valueOf(this.channelId));
    }

    private void processMessage(ByteBuffer byteBuffer) {
        try {
            switch (Type.values()[byteBuffer.get()]) {
                case USER_DEFINE:
                    if (this.directListener == null) {
                        Message decode = Message.decode(byteBuffer);
                        if (this.messageListener != null) {
                            this.messageListener.onMessage(decode, this);
                        }
                        TagMessageHandler.instance().handler(decode, this);
                        break;
                    } else {
                        this.directListener.accept(byteBuffer);
                        return;
                    }
                case COMMAND:
                    processCommand(byteBuffer);
                    break;
                case API:
                    API_REGISTRY.invoke(this, byteBuffer);
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + Type.values()[byteBuffer.get()]);
            }
        } catch (Exception e) {
            log.error("Process message failed.", (Throwable) e);
        }
    }

    private void processCommand(ByteBuffer byteBuffer) {
        Command command = Command.values()[byteBuffer.get()];
        switch (command) {
            case PONG:
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive pong command.", Long.valueOf(this.channelId));
                    return;
                }
                return;
            case ACK:
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive ack command.", Long.valueOf(this.channelId));
                    return;
                }
                return;
            case PING:
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive ping command.", Long.valueOf(this.channelId));
                }
                sendAsync(buffer(Type.COMMAND, 1).writeByte(Command.PONG.code()));
                return;
            case CLOSE:
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive close command.", Long.valueOf(this.channelId));
                }
                shutdown();
                Executors.execute(this.channelId + "-channel-close", () -> {
                    this.onClose.accept(Long.valueOf(this.channelId));
                });
                return;
            case ERROR:
                log.error("Receive error: {}.", PrimitiveCodec.readString(byteBuffer));
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + command);
        }
    }

    public int closeRetry() {
        return this.closeRetry;
    }

    public Consumer<Long> onClose() {
        return this.onClose;
    }

    public LinkedRunner runner() {
        return this.runner;
    }

    public Consumer<ByteBuffer> directListener() {
        return this.directListener;
    }

    public MessageListener messageListener() {
        return this.messageListener;
    }

    public Consumer<io.dingodb.net.Channel> closeListener() {
        return this.closeListener;
    }

    @Override // io.dingodb.net.Channel
    public long channelId() {
        return this.channelId;
    }

    public Connection connection() {
        return this.connection;
    }

    @Override // io.dingodb.net.Channel
    public Channel.Status status() {
        return this.status;
    }

    public Channel directListener(Consumer<ByteBuffer> consumer) {
        this.directListener = consumer;
        return this;
    }
}
