package org.apache.paimon.flink.action.cdc.postgres;

import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.class */
public class PostgresActionITCaseBase extends CdcActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresActionITCaseBase.class);
    public static final String DEFAULT_DB = "postgres";
    protected static final DockerImageName PG_IMAGE = DockerImageName.parse("postgres:13").asCompatibleSubstituteFor(DEFAULT_DB);
    private static final String USER = "paimonuser";
    private static final String PASSWORD = "paimonpw";
    protected static final PostgresContainer POSTGRES_CONTAINER = ((PostgresContainer) ((PostgresContainer) ((PostgresContainer) new PostgresContainer(PG_IMAGE).withDatabaseName(DEFAULT_DB)).withUsername(USER)).withPassword(PASSWORD)).withEnv("TZ", "America/Los_Angeles").withLogConsumer(new Slf4jLogConsumer(LOG)).withCommand(new String[]{DEFAULT_DB, "-c", "fsync=off", "-c", "wal_level=logical", "-c", "max_replication_slots=20", "-c", "max_wal_senders=20"});

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase$PostgresSyncTableActionBuilder.class */
    protected class PostgresSyncTableActionBuilder extends CdcActionITCaseBase.SyncTableActionBuilder<PostgresSyncTableAction> {
        public PostgresSyncTableActionBuilder(Map<String, String> map) {
            super(PostgresSyncTableAction.class, map);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void start() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterAll
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        POSTGRES_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Statement getStatement(String str) throws SQLException {
        return DriverManager.getConnection(String.format("jdbc:postgresql://%s:%s/%s", POSTGRES_CONTAINER.getHost(), Integer.valueOf(POSTGRES_CONTAINER.getDatabasePort()), str), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword()).createStatement();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getBasicPostgresConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put(PostgresSourceOptions.HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
        hashMap.put(PostgresSourceOptions.PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getDatabasePort()));
        hashMap.put(PostgresSourceOptions.USERNAME.key(), USER);
        hashMap.put(PostgresSourceOptions.PASSWORD.key(), PASSWORD);
        hashMap.put(PostgresSourceOptions.SLOT_NAME.key(), getSlotName());
        hashMap.put(PostgresSourceOptions.DECODING_PLUGIN_NAME.key(), "pgoutput");
        return hashMap;
    }

    protected String getSlotName() {
        return "paimon_" + new Random().nextInt(10000);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PostgresSyncTableActionBuilder syncTableActionBuilder(Map<String, String> map) {
        return new PostgresSyncTableActionBuilder(map);
    }
}
