package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.class */
public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCRequestHandler.class);
    private final Consumer<List<DataRecordResult.Record>> consumer;
    private final ExceptionHandler exceptionHandler;
    private final ServerErrorResultHandler errorResultHandler;

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) {
        ClientConnectionContext clientConnectionContext = new ClientConnectionContext();
        clientConnectionContext.getStatus().set(ClientConnectionStatus.NOT_LOGGED_IN);
        channelHandlerContext.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(clientConnectionContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent((Object) null);
        log.info("Channel inactive, stop CDC client");
        channelHandlerContext.fireChannelInactive();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        CDCResponse cDCResponse = (CDCResponse) obj;
        ClientConnectionContext clientConnectionContext = (ClientConnectionContext) channelHandlerContext.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
        Optional ofNullable = Optional.ofNullable(clientConnectionContext.getResponseFutureMap().get(cDCResponse.getRequestId()));
        if (cDCResponse.getStatus() != CDCResponse.Status.SUCCEED) {
            CDCRequest.Type type = CDCRequest.Type.UNKNOWN;
            if (ofNullable.isPresent()) {
                ResponseFuture responseFuture = (ResponseFuture) ofNullable.get();
                responseFuture.setErrorCode(cDCResponse.getErrorCode());
                responseFuture.setErrorMessage(cDCResponse.getErrorMessage());
                responseFuture.countDown();
                type = responseFuture.getRequestType();
            }
            this.errorResultHandler.handleServerError(channelHandlerContext, new ServerErrorResult(cDCResponse.getErrorCode(), cDCResponse.getErrorMessage(), type));
            ofNullable.ifPresent(responseFuture2 -> {
                responseFuture2.setErrorCode(cDCResponse.getErrorCode());
                responseFuture2.setErrorMessage(cDCResponse.getErrorMessage());
                responseFuture2.countDown();
            });
            return;
        }
        if (cDCResponse.hasServerGreetingResult()) {
            ServerGreetingResult serverGreetingResult = cDCResponse.getServerGreetingResult();
            log.info("Received server greeting result, serverVersion={}, protocolVersion={}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
            return;
        }
        if (ClientConnectionStatus.NOT_LOGGED_IN == clientConnectionContext.getStatus().get() && ofNullable.isPresent() && CDCRequest.Type.LOGIN == ((ResponseFuture) ofNullable.get()).getRequestType()) {
            ofNullable.ifPresent((v0) -> {
                v0.countDown();
            });
            clientConnectionContext.getStatus().set(ClientConnectionStatus.LOGGED_IN);
            return;
        }
        if (cDCResponse.hasStreamDataResult()) {
            StreamDataResult streamDataResult = cDCResponse.getStreamDataResult();
            ofNullable.ifPresent(responseFuture3 -> {
                responseFuture3.setResult(cDCResponse.getStreamDataResult().getStreamingId());
            });
            clientConnectionContext.getStreamingIds().add(streamDataResult.getStreamingId());
        } else if (cDCResponse.hasDataRecordResult()) {
            processDataRecords(channelHandlerContext, cDCResponse.getDataRecordResult());
        }
        ofNullable.ifPresent((v0) -> {
            v0.countDown();
        });
    }

    private void processDataRecords(ChannelHandlerContext channelHandlerContext, DataRecordResult dataRecordResult) {
        this.consumer.accept(dataRecordResult.getRecordList());
        channelHandlerContext.channel().writeAndFlush(CDCRequest.newBuilder().setType(CDCRequest.Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(dataRecordResult.getAckId()).build()).build());
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.exceptionHandler.handleException(channelHandlerContext, th);
    }

    @Generated
    public CDCRequestHandler(Consumer<List<DataRecordResult.Record>> consumer, ExceptionHandler exceptionHandler, ServerErrorResultHandler serverErrorResultHandler) {
        this.consumer = consumer;
        this.exceptionHandler = exceptionHandler;
        this.errorResultHandler = serverErrorResultHandler;
    }
}
