package io.debezium.connector.postgresql;

import io.debezium.doc.FixFor;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/DebeziumEngineIT.class */
public class DebeziumEngineIT {

    @Rule
    public SkipTestRule skipTest = new SkipTestRule();
    private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumEngineIT.class);
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("connector-offsets.txt").toAbsolutePath();
    private static final AtomicInteger offsetStoreSetCalls = new AtomicInteger();

    /* loaded from: input_file:io/debezium/connector/postgresql/DebeziumEngineIT$TestOffsetStore.class */
    public static class TestOffsetStore extends FileOffsetBackingStore {
        public TestOffsetStore() {
            super(KafkaConnectUtil.converterForOffsetStore());
        }

        public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> collection) {
            DebeziumEngineIT.LOGGER.info("Get offsets called");
            return super.get(collection);
        }

        public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
            DebeziumEngineIT.LOGGER.info("Set offsets called");
            DebeziumEngineIT.offsetStoreSetCalls.incrementAndGet();
            return super.set(map, callback);
        }
    }

    @Before
    public void before() throws SQLException {
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
        OFFSET_STORE_PATH.toFile().delete();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE SCHEMA engine;", "CREATE TABLE engine.test (id INT PRIMARY KEY, val VARCHAR(32));", "INSERT INTO engine.test VALUES(1, 'value1');");
    }

    @Test
    @FixFor({"DBZ-1807"})
    public void shouldSerializeToJson() throws Exception {
        Properties properties = new Properties();
        properties.putAll(TestHelper.defaultConfig().build().asMap());
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("converter.schemas.enable", "false");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        DebeziumEngine build = DebeziumEngine.create(Json.class).using(properties).notifying((list, recordCommitter) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ChangeEvent changeEvent = (ChangeEvent) it.next();
                Assertions.assertThat((String) changeEvent.key()).isNotNull();
                Assertions.assertThat((String) changeEvent.value()).isNotNull();
                try {
                    Document read = DocumentReader.defaultReader().read((String) changeEvent.key());
                    Document read2 = DocumentReader.defaultReader().read((String) changeEvent.value());
                    Assertions.assertThat(read.getInteger("id")).isEqualTo(1);
                    Assertions.assertThat(read2.getDocument("after").getInteger("id")).isEqualTo(1);
                    Assertions.assertThat(read2.getDocument("after").getString("val")).isEqualTo("value1");
                    countDownLatch.countDown();
                    recordCommitter.markProcessed(changeEvent);
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }
            recordCommitter.markBatchFinished();
        }).using(getClass().getClassLoader()).build();
        try {
            newFixedThreadPool.execute(() -> {
                LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
                build.run();
            });
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1807"})
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void shouldSerializeToAvro() throws Exception {
        Properties properties = new Properties();
        properties.putAll(TestHelper.defaultConfig().build().asMap());
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("converter.schema.registry.url", "http://localhost:" + TestHelper.defaultJdbcConfig().getPort());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        DebeziumEngine build = DebeziumEngine.create(Avro.class).using(properties).notifying((list, recordCommitter) -> {
            Assert.fail("Should not be invoked due to serialization error");
        }).using(new DebeziumEngine.CompletionCallback() { // from class: io.debezium.connector.postgresql.DebeziumEngineIT.1
            public void handle(boolean z, String str, Throwable th) {
                Assertions.assertThat(z).isFalse();
                Assertions.assertThat(str).contains(new CharSequence[]{"Failed to serialize Avro data from topic test_server.engine.test"});
                countDownLatch.countDown();
            }
        }).build();
        try {
            newFixedThreadPool.execute(() -> {
                LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
                build.run();
            });
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1807"})
    public void shouldSerializeToCloudEvents() throws Exception {
        Properties properties = new Properties();
        properties.putAll(TestHelper.defaultConfig().build().asMap());
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("converter.schemas.enable", "false");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        DebeziumEngine build = DebeziumEngine.create(Json.class, CloudEvents.class).using(properties).notifying((list, recordCommitter) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ChangeEvent changeEvent = (ChangeEvent) it.next();
                try {
                    Assertions.assertThat(DocumentReader.defaultReader().read((String) changeEvent.key()).getInteger("id")).isEqualTo(1);
                    Assertions.assertThat((String) changeEvent.value()).isNotNull();
                    Document read = DocumentReader.defaultReader().read((String) changeEvent.value());
                    Assertions.assertThat(read.getString("id")).contains(new CharSequence[]{"txId"});
                    Assertions.assertThat(read.getDocument("data").getDocument("payload").getDocument("after").getInteger("id")).isEqualTo(1);
                    Assertions.assertThat(read.getDocument("data").getDocument("payload").getDocument("after").getString("val")).isEqualTo("value1");
                    countDownLatch.countDown();
                    recordCommitter.markProcessed(changeEvent);
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }
            recordCommitter.markBatchFinished();
        }).using(getClass().getClassLoader()).build();
        try {
            newFixedThreadPool.execute(() -> {
                LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
                build.run();
            });
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2461"})
    public void testOffsetsCommitAfterStop() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        TestHelper.execute("DROP TABLE IF EXISTS tests;", "CREATE TABLE tests (id SERIAL PRIMARY KEY);");
        Properties properties = new Properties();
        properties.putAll(TestHelper.defaultConfig().build().asMap());
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "3000");
        properties.setProperty("converter.schemas.enable", "false");
        properties.setProperty("slot.drop.on.stop", "false");
        properties.setProperty("offset.storage", TestOffsetStore.class.getName());
        Runnable build = DebeziumEngine.create(Json.class).using(properties).using(new DebeziumEngine.ConnectorCallback() { // from class: io.debezium.connector.postgresql.DebeziumEngineIT.2
            public void connectorStarted() {
            }

            public void connectorStopped() {
            }
        }).using((z, str, th) -> {
            atomicReference.compareAndSet(null, th);
        }).notifying((list, recordCommitter) -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    recordCommitter.markProcessed((ChangeEvent) it.next());
                }
                recordCommitter.markBatchFinished();
            } catch (Exception e) {
                Testing.printError(e);
            }
        }).build();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(build);
        while (offsetStoreSetCalls.get() < 1) {
            TestHelper.execute("INSERT INTO tests VALUES(default)", new String[0]);
        }
        build.close();
        Assertions.assertThat(offsetStoreSetCalls.get()).isGreaterThanOrEqualTo(1);
        offsetStoreSetCalls.set(0);
        for (int i = 0; i < 100; i++) {
            TestHelper.execute("INSERT INTO tests VALUES(default)", new String[0]);
        }
        newSingleThreadExecutor.execute(build);
        while (offsetStoreSetCalls.get() < 1) {
            TestHelper.execute("INSERT INTO tests VALUES(default)", new String[0]);
        }
        build.close();
        Assertions.assertThat(offsetStoreSetCalls.get()).isGreaterThanOrEqualTo(1);
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
    }
}
