package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl;
import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
import org.opengauss.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.class */
public final class OpenGaussLogicalReplication {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(OpenGaussLogicalReplication.class);
    private static final String HA_PORT_ERROR_MESSAGE_KEY = "HA port";

    public Connection createConnection(StandardPipelineDataSourceConfiguration standardPipelineDataSourceConfiguration) throws SQLException {
        Properties properties = new Properties();
        PGProperty.USER.set(properties, standardPipelineDataSourceConfiguration.getUsername());
        PGProperty.PASSWORD.set(properties, standardPipelineDataSourceConfiguration.getPassword());
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
        try {
            return DriverManager.getConnection(standardPipelineDataSourceConfiguration.getUrl(), properties);
        } catch (SQLException e) {
            if (!failedBecauseOfNonHAPort(e)) {
                throw e;
            }
            log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", e.getSQLState(), e.getMessage());
            return tryConnectingToHAPort(standardPipelineDataSourceConfiguration.getUrl(), properties);
        }
    }

    private boolean failedBecauseOfNonHAPort(SQLException sQLException) {
        return sQLException.getMessage().contains(HA_PORT_ERROR_MESSAGE_KEY);
    }

    private Connection tryConnectingToHAPort(String str, Properties properties) throws SQLException {
        JdbcUrl parse = new StandardJdbcUrlParser().parse(str);
        PGProperty.PG_HOST.set(properties, parse.getHostname());
        PGProperty.PG_DBNAME.set(properties, parse.getDatabase());
        PGProperty.PG_PORT.set(properties, parse.getPort() + 1);
        return DriverManager.getConnection((String) TypedSPILoader.getService(DatabaseType.class, "openGauss").getJdbcUrlPrefixes().iterator().next(), properties);
    }

    public PGReplicationStream createReplicationStream(PgConnection pgConnection, BaseLogSequenceNumber baseLogSequenceNumber, String str, int i) throws SQLException {
        ChainedLogicalStreamBuilder withStartPosition = pgConnection.getReplicationAPI().replicationStream().logical().withSlotName(str).withSlotOption("include-xids", true).withSlotOption("skip-empty-xacts", true).withStartPosition((LogSequenceNumber) baseLogSequenceNumber.get());
        return i < 3 ? withStartPosition.start() : withStartPosition.withSlotOption("parallel-decode-num", 10).withSlotOption("decode-style", "j").withSlotOption("sending-batch", 0).start();
    }
}
