package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.util.Collect;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
/* loaded from: input_file:io/debezium/connector/postgresql/TransactionMetadataIT.class */
public class TransactionMetadataIT extends AbstractAsyncEngineConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";

    @Rule
    public final TestRule skip = new SkipTestDependingOnDecoderPluginNameRule();

    @BeforeClass
    public static void beforeClass() throws SQLException {
        TestHelper.dropAllSchemas();
    }

    @Before
    public void before() {
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    public void transactionMetadata() throws InterruptedException {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        ArrayList arrayList = new ArrayList();
        Awaitility.await("Skip empty transactions and find the data").atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 3)).until(() -> {
            List allRecordsInOrder = consumeRecordsByTopic(2).allRecordsInOrder();
            if (((SourceRecord) allRecordsInOrder.get(1)).topic().contains("transaction")) {
                return false;
            }
            arrayList.addAll(allRecordsInOrder);
            arrayList.addAll(consumeRecordsByTopic(2).allRecordsInOrder());
            return true;
        });
        Assertions.assertThat(arrayList).hasSize(4);
        String assertBeginTransaction = assertBeginTransaction((SourceRecord) arrayList.get(0));
        assertRecordTransactionMetadata((SourceRecord) arrayList.get(1), assertBeginTransaction, 1L, 1L);
        assertRecordTransactionMetadata((SourceRecord) arrayList.get(2), assertBeginTransaction, 2L, 1L);
        assertEndTransaction((SourceRecord) arrayList.get(3), assertBeginTransaction, 2L, Collect.hashMapOf("s1.a", 1, "s2.a", 1));
    }

    protected String assertBeginTransaction(SourceRecord sourceRecord) {
        Struct struct = (Struct) sourceRecord.value();
        Struct struct2 = (Struct) sourceRecord.key();
        Map sourceOffset = sourceRecord.sourceOffset();
        Assertions.assertThat(struct.getString("status")).isEqualTo("BEGIN");
        Assertions.assertThat(struct.getInt64("event_count")).isNull();
        String string = struct.getString("id");
        Assertions.assertThat(struct2.getString("id")).isEqualTo(string);
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo((String) Arrays.stream(string.split(":")).findFirst().get());
        return string;
    }

    protected void assertEndTransaction(SourceRecord sourceRecord, String str, long j, Map<String, Number> map) {
        Struct struct = (Struct) sourceRecord.value();
        Struct struct2 = (Struct) sourceRecord.key();
        Map sourceOffset = sourceRecord.sourceOffset();
        String str2 = (String) Arrays.stream(str.split(":")).findFirst().get();
        String format = String.format("%s:%s", str2, sourceOffset.get("lsn"));
        Assertions.assertThat(struct.getString("status")).isEqualTo("END");
        Assertions.assertThat(struct.getString("id")).isEqualTo(format);
        Assertions.assertThat(struct.getInt64("event_count")).isEqualTo(j);
        Assertions.assertThat(struct2.getString("id")).isEqualTo(format);
        Assertions.assertThat((Map) struct.getArray("data_collections").stream().map(obj -> {
            return (Struct) obj;
        }).collect(Collectors.toMap(struct3 -> {
            return struct3.getString("data_collection");
        }, struct4 -> {
            return struct4.getInt64("event_count");
        }))).isEqualTo(map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) entry.getKey();
        }, entry2 -> {
            return Long.valueOf(((Number) entry2.getValue()).longValue());
        })));
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo(str2);
    }

    protected void assertRecordTransactionMetadata(SourceRecord sourceRecord, String str, long j, long j2) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("transaction");
        Map sourceOffset = sourceRecord.sourceOffset();
        String str2 = (String) Arrays.stream(str.split(":")).findFirst().get();
        Assertions.assertThat(struct.getString("id")).isEqualTo(String.format("%s:%s", str2, sourceOffset.get("lsn")));
        Assertions.assertThat(struct.getInt64("total_order")).isEqualTo(j);
        Assertions.assertThat(struct.getInt64("data_collection_order")).isEqualTo(j2);
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo(str2);
    }
}
