package io.dingodb.net.netty.connection;

import io.dingodb.common.Location;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.net.netty.NetServiceConfiguration;
import io.dingodb.net.netty.api.ApiRegistryImpl;
import io.dingodb.net.netty.api.HandshakeApi;
import io.dingodb.net.netty.channel.Channel;
import io.dingodb.net.netty.handler.ExceptionHandler;
import io.dingodb.net.netty.handler.MessageHandler;
import io.dingodb.net.netty.packet.Command;
import io.dingodb.net.netty.packet.Type;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/net/netty/connection/ClientConnection.class */
public class ClientConnection extends Connection {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClientConnection.class);
    protected Bootstrap bootstrap;
    protected EventLoopGroup eventLoopGroup;

    public ClientConnection(Location location) {
        super(location, null);
    }

    public void connect() throws InterruptedException {
        this.bootstrap = new Bootstrap();
        this.eventLoopGroup = new NioEventLoopGroup(0, Executors.executor(this.remoteLocation.getUrl() + "/connection"));
        this.bootstrap.channel(NioSocketChannel.class).group(this.eventLoopGroup).remoteAddress(this.remoteLocation.toSocketAddress()).handler(channelInitializer());
        this.bootstrap.connect().sync2().await2();
        handshake();
    }

    private ChannelInitializer<SocketChannel> channelInitializer() {
        return new ChannelInitializer<SocketChannel>() { // from class: io.dingodb.net.netty.connection.ClientConnection.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                ClientConnection.this.socketChannel = socketChannel;
                socketChannel.pipeline().addLast(new MessageHandler(ClientConnection.this)).addLast(new IdleStateHandler(NetServiceConfiguration.heartbeat().intValue(), 0L, 0L, TimeUnit.SECONDS)).addLast(new ExceptionHandler(ClientConnection.this));
            }
        };
    }

    protected void handshake() throws InterruptedException {
        ((HandshakeApi) ApiRegistryImpl.instance().proxy(HandshakeApi.class, this.channel, NetServiceConfiguration.heartbeat().intValue())).handshake(null, HandshakeApi.Handshake.INSTANCE);
        log.info("Connection open, remote: [{}]", this.remoteLocation.getUrl());
        InetSocketAddress localAddress = this.socketChannel.localAddress();
        this.localLocation = new Location(localAddress.getHostName(), localAddress.getPort());
        Executors.scheduleWithFixedDelayAsync(String.format("%s-heartbeat", this.remoteLocation), this::sendHeartbeat, 0L, 1L, TimeUnit.SECONDS);
    }

    private void sendHeartbeat() {
        this.channel.sendAsync(this.channel.buffer(Type.COMMAND, 1).writeByte(Command.PING.code()));
    }

    @Override // io.dingodb.net.netty.connection.Connection
    protected Map<Long, Channel> createChannels() {
        return new ConcurrentHashMap();
    }

    @Override // io.dingodb.net.netty.connection.Connection
    protected String channelName(String str, long j) {
        return String.format("<%s/%s/client>", str, Long.valueOf(j));
    }

    @Override // io.dingodb.net.netty.connection.Connection
    public void receive(ByteBuffer byteBuffer) {
        if (byteBuffer == null) {
            return;
        }
        long j = byteBuffer.getLong();
        Channel channel = getChannel(j);
        if (channel == null) {
            log.error("Receive message, channel id is [{}], but not have channel.", Long.valueOf(j));
        } else {
            channel.receive(byteBuffer);
        }
    }

    @Override // io.dingodb.net.netty.connection.Connection
    public void close() {
        super.close();
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully();
        }
        log.info("Connection close, remote: [{}].", this.remoteLocation.getUrl());
    }
}
