/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources.debezium;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebeziumPostgreSqlSourceTester
extends SourceTester<DebeziumPostgreSqlContainer>
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(DebeziumPostgreSqlSourceTester.class);
    private static final String NAME = "debezium-postgres";
    private final String pulsarServiceUrl;
    private DebeziumPostgreSqlContainer debeziumPostgresqlContainer;
    private final PulsarCluster pulsarCluster;
    private final AtomicReference<String> confirmedFlushLsn = new AtomicReference<String>("not read yet");

    public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
        super(NAME);
        this.pulsarCluster = cluster;
        this.numEntriesToInsert = 10;
        this.pulsarServiceUrl = "pulsar://pulsar-proxy:6650";
        this.sourceConfig.put("database.hostname", "debezium-postgresql-example");
        this.sourceConfig.put("database.port", "5432");
        this.sourceConfig.put("database.user", "postgres");
        this.sourceConfig.put("database.password", "postgres");
        this.sourceConfig.put("database.server.id", "184055");
        this.sourceConfig.put("database.server.name", "dbserver1");
        this.sourceConfig.put("database.dbname", "postgres");
        this.sourceConfig.put("schema.whitelist", "inventory");
        this.sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
        this.sourceConfig.put("database.history.pulsar.service.url", this.pulsarServiceUrl);
        this.sourceConfig.put("topic.namespace", "debezium/postgresql");
    }

    @Override
    public void setServiceContainer(DebeziumPostgreSqlContainer container) {
        log.info("start debezium postgresql server container.");
        this.debeziumPostgresqlContainer = container;
        this.pulsarCluster.startService("debezium-postgresql-example", this.debeziumPostgresqlContainer);
    }

    @Override
    public void prepareSource() {
        log.info("debezium postgresql server already contains preconfigured data.");
    }

    @Override
    public void prepareInsertEvent() throws Exception {
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"insert into inventory.products(name, description, weight) values('test-debezium', 'description', 10);\"");
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1), max(id) from inventory.products where name='test-debezium' and weight=10;\"");
    }

    @Override
    public void prepareDeleteEvent() throws Exception {
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"delete from inventory.products where name='test-debezium';\"");
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium';\"");
    }

    @Override
    public void prepareUpdateEvent() throws Exception {
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"update inventory.products set description='test-update-description', weight='20' where name='test-debezium';\"");
        this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium' and weight=20;\"");
    }

    @Override
    public void doPostValidationCheck(String eventType) {
        super.doPostValidationCheck(eventType);
        try {
            ContainerExecResult res = this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres -c \"select confirmed_flush_lsn from pg_replication_slots;\"");
            res.assertNoStderr();
            String lastConfirmedFlushLsn = res.getStdout();
            log.info("Current confirmedFlushLsn: \n{} \nLast confirmedFlushLsn: \n{}", (Object)this.confirmedFlushLsn.get(), (Object)lastConfirmedFlushLsn);
            Assert.assertNotEquals((Object)this.confirmedFlushLsn.get(), (Object)lastConfirmedFlushLsn);
            this.confirmedFlushLsn.set(lastConfirmedFlushLsn);
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"failed to get flush lsn", (Throwable)e);
        }
    }

    @Override
    public Map<String, String> produceSourceMessages(int numMessages) {
        log.info("debezium postgresql server already contains preconfigured data.");
        return null;
    }

    @Override
    public void close() {
        if (this.pulsarCluster != null) {
            PulsarCluster.stopService("debezium-postgresql-example", this.debeziumPostgresqlContainer);
        }
    }

    public DebeziumPostgreSqlContainer getDebeziumPostgresqlContainer() {
        return this.debeziumPostgresqlContainer;
    }
}

