package org.apache.shardingsphere.data.pipeline.cdc.client;

import com.google.common.hash.Hashing;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.List;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ServerErrorResultHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.class */
public final class CDCClient implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCClient.class);
    private final CDCClientConfiguration config;
    private NioEventLoopGroup group;
    private Channel channel;

    public CDCClient(CDCClientConfiguration cDCClientConfiguration) {
        validateParameter(cDCClientConfiguration);
        this.config = cDCClientConfiguration;
    }

    private void validateParameter(CDCClientConfiguration cDCClientConfiguration) {
        if (null == cDCClientConfiguration.getAddress() || cDCClientConfiguration.getAddress().isEmpty()) {
            throw new IllegalArgumentException("The address parameter can't be null");
        }
        if (cDCClientConfiguration.getPort() <= 0) {
            throw new IllegalArgumentException("The port must be greater than 0");
        }
    }

    public void connect(final Consumer<List<DataRecordResult.Record>> consumer, final ExceptionHandler exceptionHandler, final ServerErrorResultHandler serverErrorResultHandler) {
        try {
            Bootstrap bootstrap = new Bootstrap();
            this.group = new NioEventLoopGroup(1);
            bootstrap.channel(NioSocketChannel.class).group(this.group).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_REUSEADDR, true).handler(new ChannelInitializer<NioSocketChannel>() { // from class: org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(NioSocketChannel nioSocketChannel) {
                    nioSocketChannel.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
                    nioSocketChannel.pipeline().addLast(new ChannelHandler[]{new ProtobufDecoder(CDCResponse.getDefaultInstance())});
                    nioSocketChannel.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
                    nioSocketChannel.pipeline().addLast(new ChannelHandler[]{new ProtobufEncoder()});
                    nioSocketChannel.pipeline().addLast(new ChannelHandler[]{new CDCRequestHandler(consumer, exceptionHandler, serverErrorResultHandler)});
                }
            });
            this.channel = bootstrap.connect(this.config.getAddress(), this.config.getPort()).sync().channel();
        } catch (InterruptedException e) {
            throw e;
        }
    }

    public void await() throws InterruptedException {
        this.channel.closeFuture().sync();
    }

    public synchronized void login(CDCLoginParameter cDCLoginParameter) {
        checkChannelActive();
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        if (ClientConnectionStatus.LOGGED_IN == clientConnectionContext.getStatus().get()) {
            throw new IllegalStateException("The client is already logged in");
        }
        LoginRequestBody build = LoginRequestBody.newBuilder().setType(LoginRequestBody.LoginType.BASIC).setBasicBody(LoginRequestBody.BasicBody.newBuilder().setUsername(cDCLoginParameter.getUsername()).setPassword(Hashing.sha256().hashBytes(cDCLoginParameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
        String generateRequestId = RequestIdUtils.generateRequestId();
        CDCRequest build2 = CDCRequest.newBuilder().setType(CDCRequest.Type.LOGIN).setVersion(1).setRequestId(generateRequestId).setLoginRequestBody(build).build();
        ResponseFuture responseFuture = new ResponseFuture(generateRequestId, CDCRequest.Type.LOGIN);
        clientConnectionContext.getResponseFutureMap().put(generateRequestId, responseFuture);
        this.channel.writeAndFlush(build2);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), clientConnectionContext);
        log.info("Login success, username: {}", cDCLoginParameter.getUsername());
    }

    private void checkChannelActive() {
        if (null == this.channel || !this.channel.isActive()) {
            throw new IllegalStateException("The channel is not active, call the `connect` method first");
        }
    }

    public String startStreaming(StartStreamingParameter startStreamingParameter) {
        StreamDataRequestBody build = StreamDataRequestBody.newBuilder().setDatabase(startStreamingParameter.getDatabase()).setFull(startStreamingParameter.isFull()).addAllSourceSchemaTable(startStreamingParameter.getSchemaTables()).build();
        String generateRequestId = RequestIdUtils.generateRequestId();
        CDCRequest build2 = CDCRequest.newBuilder().setRequestId(generateRequestId).setType(CDCRequest.Type.STREAM_DATA).setStreamDataRequestBody(build).build();
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        ResponseFuture responseFuture = new ResponseFuture(generateRequestId, CDCRequest.Type.STREAM_DATA);
        clientConnectionContext.getResponseFutureMap().put(generateRequestId, responseFuture);
        this.channel.writeAndFlush(build2);
        String obj = responseFuture.waitResponseResult(this.config.getTimeoutMills(), clientConnectionContext).toString();
        log.info("Start streaming success, streaming id: {}", obj);
        return obj;
    }

    public void restartStreaming(String str) {
        String generateRequestId = RequestIdUtils.generateRequestId();
        CDCRequest build = CDCRequest.newBuilder().setRequestId(generateRequestId).setType(CDCRequest.Type.START_STREAMING).setStartStreamingRequestBody(StartStreamingRequestBody.newBuilder().setStreamingId(str).build()).build();
        ResponseFuture responseFuture = new ResponseFuture(generateRequestId, CDCRequest.Type.START_STREAMING);
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        clientConnectionContext.getResponseFutureMap().put(generateRequestId, responseFuture);
        this.channel.writeAndFlush(build);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), clientConnectionContext);
        log.info("Restart streaming success, streaming id: {}", str);
    }

    public void stopStreaming(String str) {
        String generateRequestId = RequestIdUtils.generateRequestId();
        CDCRequest build = CDCRequest.newBuilder().setRequestId(generateRequestId).setType(CDCRequest.Type.STOP_STREAMING).setStopStreamingRequestBody(StopStreamingRequestBody.newBuilder().setStreamingId(str).build()).build();
        ResponseFuture responseFuture = new ResponseFuture(generateRequestId, CDCRequest.Type.STOP_STREAMING);
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        clientConnectionContext.getResponseFutureMap().put(generateRequestId, responseFuture);
        this.channel.writeAndFlush(build);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), clientConnectionContext);
        clientConnectionContext.getStreamingIds().remove(str);
        log.info("Stop streaming success, streaming id: {}", str);
    }

    public void dropStreaming(String str) {
        String generateRequestId = RequestIdUtils.generateRequestId();
        CDCRequest build = CDCRequest.newBuilder().setRequestId(generateRequestId).setType(CDCRequest.Type.DROP_STREAMING).setDropStreamingRequestBody(DropStreamingRequestBody.newBuilder().setStreamingId(str).build()).build();
        ResponseFuture responseFuture = new ResponseFuture(generateRequestId, CDCRequest.Type.DROP_STREAMING);
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        clientConnectionContext.getResponseFutureMap().put(generateRequestId, responseFuture);
        this.channel.writeAndFlush(build);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), clientConnectionContext);
        clientConnectionContext.getStreamingIds().remove(str);
        log.info("Drop streaming success, streaming id: {}", str);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (null != this.channel) {
            this.channel.close().awaitUninterruptibly();
        }
        if (null != this.group) {
            this.group.shutdownGracefully();
        }
    }
}
