package org.apache.shardingsphere.proxy.frontend.reactive.command;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;
import lombok.Generated;
import org.apache.shardingsphere.db.protocol.CommonConstants;
import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.exception.ExpectedExceptions;
import org.apache.shardingsphere.proxy.frontend.reactive.command.executor.ReactiveCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.reactive.spi.ReactiveDatabaseProtocolFrontendEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTask.class */
public final class ReactiveCommandExecuteTask implements Runnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReactiveCommandExecuteTask.class);
    private final ReactiveDatabaseProtocolFrontendEngine reactiveDatabaseProtocolFrontendEngine;
    private final ConnectionSession connectionSession;
    private final ChannelHandlerContext context;
    private final Object message;
    private volatile boolean isNeedFlush;
    private volatile boolean writeInEventLoop;

    @Override // java.lang.Runnable
    public void run() {
        PacketPayload createPacketPayload = this.reactiveDatabaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) this.message, (Charset) this.context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
        try {
            ((Future) this.connectionSession.getBackendConnection().prepareForTaskExecution()).compose(r6 -> {
                return executeCommand(createPacketPayload).eventually(r5 -> {
                    return closeResources(createPacketPayload);
                });
            }).onFailure(this::processThrowable);
        } catch (Exception e) {
            processException(e);
            closeResources(createPacketPayload);
        }
    }

    private Future<Void> executeCommand(PacketPayload packetPayload) {
        try {
            CommandExecuteEngine commandExecuteEngine = this.reactiveDatabaseProtocolFrontendEngine.getCommandExecuteEngine();
            ReactiveCommandExecuteEngine reactiveCommandExecuteEngine = this.reactiveDatabaseProtocolFrontendEngine.getReactiveCommandExecuteEngine();
            CommandPacketType commandPacketType = commandExecuteEngine.getCommandPacketType(packetPayload);
            ReactiveCommandExecutor reactiveCommandExecutor = reactiveCommandExecuteEngine.getReactiveCommandExecutor(commandPacketType, commandExecuteEngine.getCommandPacket(packetPayload, commandPacketType, this.connectionSession), this.connectionSession);
            return reactiveCommandExecutor.executeFuture().compose(this::handleResponsePackets).eventually(r3 -> {
                return reactiveCommandExecutor.closeFuture();
            });
        } catch (SQLException e) {
            throw e;
        }
    }

    private Future<Void> handleResponsePackets(Collection<DatabasePacket<?>> collection) {
        ChannelHandlerContext channelHandlerContext = this.context;
        channelHandlerContext.getClass();
        collection.forEach((v1) -> {
            r1.write(v1);
        });
        boolean z = !collection.isEmpty();
        this.isNeedFlush = z;
        this.writeInEventLoop = z && this.context.executor().inEventLoop();
        return Future.succeededFuture();
    }

    private Future<Void> closeResources(PacketPayload packetPayload) {
        try {
            try {
                packetPayload.close();
            } catch (BackendConnectionException e) {
                throw e;
            }
        } catch (Exception e2) {
        }
        this.connectionSession.clearQueryContext();
        return ((Future) this.connectionSession.getBackendConnection().closeExecutionResources()).onComplete(this::doFlushIfNecessary);
    }

    private void doFlushIfNecessary(AsyncResult<Void> asyncResult) {
        if (this.isNeedFlush) {
            if (this.writeInEventLoop || !this.context.executor().inEventLoop()) {
                this.context.flush();
                return;
            }
            EventExecutor executor = this.context.executor();
            ChannelHandlerContext channelHandlerContext = this.context;
            channelHandlerContext.getClass();
            executor.execute(channelHandlerContext::flush);
        }
    }

    private void processThrowable(Throwable th) {
        processException(th instanceof Exception ? (Exception) th : new Exception(th));
    }

    private void processException(Exception exc) {
        if (!ExpectedExceptions.isExpected(exc.getClass())) {
            log.error("Exception occur: ", exc);
        }
        this.context.write(this.reactiveDatabaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(exc));
        Optional otherPacket = this.reactiveDatabaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(this.connectionSession);
        ChannelHandlerContext channelHandlerContext = this.context;
        channelHandlerContext.getClass();
        otherPacket.ifPresent((v1) -> {
            r1.write(v1);
        });
        this.context.flush();
    }

    @Generated
    public ReactiveCommandExecuteTask(ReactiveDatabaseProtocolFrontendEngine reactiveDatabaseProtocolFrontendEngine, ConnectionSession connectionSession, ChannelHandlerContext channelHandlerContext, Object obj) {
        this.reactiveDatabaseProtocolFrontendEngine = reactiveDatabaseProtocolFrontendEngine;
        this.connectionSession = connectionSession;
        this.context = channelHandlerContext;
        this.message = obj;
    }
}
