package io.debezium.connector.postgresql.transforms.timescaledb;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.testing.testcontainers.ImageNames;
import java.sql.SQLException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDbDatabaseTest.class */
public class TimescaleDbDatabaseTest extends AbstractConnectorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(TimescaleDbDatabaseTest.class);
    private static final Network network = Network.newNetwork();
    public static final PostgreSQLContainer<?> timescaleDbContainer = new PostgreSQLContainer(ImageNames.TIMESCALE_DB_IMAGE_NAME).withNetwork(network).withNetworkAliases(new String[]{"postgres"}).withUsername("postgres").withPassword("postgres").withDatabaseName("postgres").withReuse(false).withCopyToContainer(Transferable.of("#!/bin/bash\n\necho \"wal_level=logical\" >> ${POSTGRESQL_CONF_DIR}/postgresql.conf"), "docker-entrypoint-initdb.d/002_enable_replication.sh");
    private PostgresConnection connection;
    private Configuration config;

    @Before
    public void prepareDatabase() throws Exception {
        Startables.deepStart(new Startable[]{timescaleDbContainer}).join();
        JdbcConfiguration.Builder defaultJdbcConfigBuilder = TestHelper.defaultJdbcConfigBuilder();
        defaultJdbcConfigBuilder.with(JdbcConfiguration.HOSTNAME, timescaleDbContainer.getHost());
        defaultJdbcConfigBuilder.with(JdbcConfiguration.PORT, timescaleDbContainer.getMappedPort(5432));
        this.connection = new PostgresConnection(defaultJdbcConfigBuilder.build(), TestHelper.CONNECTION_TEST);
        dropPublication(this.connection);
        this.connection.execute(new String[]{"DROP TABLE IF EXISTS conditions", "CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);", "SELECT create_hypertable('conditions', 'time');", "CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert,update')"});
        this.config = TestHelper.defaultConfig().with(PostgresConnectorConfig.HOSTNAME, timescaleDbContainer.getHost()).with(PostgresConnectorConfig.PORT, timescaleDbContainer.getMappedPort(5432)).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA).with(PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.PGOUTPUT).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "_timescaledb_internal").with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with("transforms", "timescaledb").with("transforms.timescaledb.type", TimescaleDb.class.getName()).with("transforms.timescaledb.database.hostname", timescaleDbContainer.getHost()).with("transforms.timescaledb.database.port", timescaleDbContainer.getMappedPort(5432)).with("transforms.timescaledb.database.user", "postgres").with("transforms.timescaledb.database.password", "postgres").with("transforms.timescaledb.database.dbname", "postgres").build();
    }

    @After
    public void dropDatabase() {
        timescaleDbContainer.stop();
    }

    protected void insertData() throws SQLException {
        this.connection.execute(new String[]{"INSERT INTO conditions VALUES (now(), 'Loc 1', 30, 50)", "INSERT INTO conditions VALUES (now(), 'Loc 1', 35, 55)", "INSERT INTO conditions VALUES (now(), 'Loc 1', 40, 60)"});
    }

    @Test
    public void shouldTransformChunks() throws Exception {
        start(PostgresConnector.class, this.config);
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        insertData();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("timescaledb.public.conditions")).hasSize(3);
        stopConnector();
    }

    @Test
    public void shouldTransformAggregates() throws Exception {
        start(PostgresConnector.class, this.config);
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        insertData();
        this.connection.execute(new String[]{"  CREATE MATERIALIZED VIEW conditions_summary\n        WITH (timescaledb.continuous) AS\n        SELECT location,\n           time_bucket(INTERVAL '1 hour', time) AS bucket,\n           AVG(temperature),\n           MAX(temperature),\n           MIN(temperature)\n        FROM conditions\n        GROUP BY location, bucket;\n"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("timescaledb.public.conditions_summary")).hasSize(1);
        stopConnector();
    }

    @Test
    public void shouldTransformCompressedChunks() throws Exception {
        start(PostgresConnector.class, this.config);
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        insertData();
        this.connection.execute(new String[]{"ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_orderby = 'time DESC', timescaledb.compress_segmentby = 'location')", "SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("timescaledb.public.conditions")).hasSize(3);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("timescaledb._timescaledb_internal._compressed_hypertable_2")).hasSize(1);
        stopConnector();
    }

    private void dropPublication(PostgresConnection postgresConnection) {
        try {
            postgresConnection.execute(new String[]{"DROP PUBLICATION dbz_publication"});
        } catch (Exception e) {
            LOGGER.debug("Error while dropping publication", e);
        }
    }
}
