package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.transaction.xa.Xid;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.class */
public class JdbcExactlyOnceSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class);
    private final SinkWriter.Context sinkcontext;
    private final JobContext context;
    private final List<JdbcSinkState> recoverStates;
    private final XaFacade xaFacade;
    private final XaGroupOps xaGroupOps;
    private final XidGenerator xidGenerator;
    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
    private transient boolean isOpen;
    private transient Xid currentXid;
    private transient Xid prepareXid;

    public JdbcExactlyOnceSinkWriter(SinkWriter.Context context, JobContext jobContext, JdbcStatementBuilder<SeaTunnelRow> jdbcStatementBuilder, JdbcSinkOptions jdbcSinkOptions, List<JdbcSinkState> list) {
        Preconditions.checkArgument(jdbcSinkOptions.getJdbcConnectionOptions().getMaxRetries() == 0, "JDBC XA sink requires maxRetries equal to 0, otherwise it could cause duplicates.");
        this.context = jobContext;
        this.sinkcontext = context;
        this.recoverStates = list;
        this.xidGenerator = XidGenerator.semanticXidGenerator();
        Preconditions.checkState(jdbcSinkOptions.isExactlyOnce(), "is_exactly_once config error");
        this.xaFacade = XaFacade.fromJdbcConnectionOptions(jdbcSinkOptions.getJdbcConnectionOptions());
        this.outputFormat = new JdbcOutputFormat<>(this.xaFacade, jdbcSinkOptions.getJdbcConnectionOptions(), () -> {
            return new SimpleBatchStatementExecutor(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(), jdbcStatementBuilder);
        });
        this.xaGroupOps = new XaGroupOpsImpl(this.xaFacade);
    }

    private void tryOpen() throws IOException {
        if (this.isOpen) {
            return;
        }
        this.isOpen = true;
        try {
            this.xidGenerator.open();
            this.xaFacade.open();
            this.outputFormat.open();
            if (!this.recoverStates.isEmpty()) {
                this.xaGroupOps.recoverAndRollback(this.context, this.sinkcontext, this.xidGenerator, this.recoverStates.get(0).getXid());
            }
            beginTx();
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }
    }

    public List<JdbcSinkState> snapshotState(long j) {
        Preconditions.checkState(this.prepareXid != null, "prepare xid must not be null");
        return Collections.singletonList(new JdbcSinkState(this.prepareXid));
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        tryOpen();
        Preconditions.checkState(this.currentXid != null, "current xid must not be null");
        this.outputFormat.writeRecord(SerializationUtils.clone(seaTunnelRow));
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        tryOpen();
        prepareCurrentTx();
        this.currentXid = null;
        beginTx();
        Preconditions.checkState(this.prepareXid != null, "prepare xid must not be null");
        return Optional.of(new XidInfo(this.prepareXid, 0));
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        if (this.currentXid != null && this.xaFacade.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", this.currentXid);
                this.xaFacade.failAndRollback(this.currentXid);
            } catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", this.currentXid, e);
            }
        }
        try {
            this.xaFacade.close();
        } catch (Exception e2) {
            ExceptionUtils.rethrowIOException(e2);
        }
        this.xidGenerator.close();
        this.currentXid = null;
        this.prepareXid = null;
    }

    private void beginTx() throws IOException {
        Preconditions.checkState(this.currentXid == null, "currentXid not null");
        this.currentXid = this.xidGenerator.generateXid(this.context, this.sinkcontext, System.currentTimeMillis());
        try {
            this.xaFacade.start(this.currentXid);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }
    }

    private void prepareCurrentTx() throws IOException {
        Preconditions.checkState(this.currentXid != null, "no current xid");
        this.outputFormat.flush();
        try {
            this.xaFacade.endAndPrepare(this.currentXid);
            this.prepareXid = this.currentXid;
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1785575460:
                if (implMethodName.equals("lambda$new$5e1f62e2$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat$StatementExecutorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions;Lorg/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder;)Lorg/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor;")) {
                    JdbcSinkOptions jdbcSinkOptions = (JdbcSinkOptions) serializedLambda.getCapturedArg(0);
                    JdbcStatementBuilder jdbcStatementBuilder = (JdbcStatementBuilder) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return new SimpleBatchStatementExecutor(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(), jdbcStatementBuilder);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
