package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
import io.debezium.pipeline.signal.actions.snapshotting.StopSnapshot;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 13, minor = 0, reason = "Function pg_current_snapshot() not supported until PostgreSQL 13")
/* loaded from: input_file:io/debezium/connector/postgresql/ReadOnlyIncrementalSnapshotIT.class */
public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT {
    private final LogInterceptor executeSignalInterceptor = new LogInterceptor(ExecuteSnapshot.class);
    private final LogInterceptor stopSignalInterceptor = new LogInterceptor(StopSnapshot.class);

    @Override // io.debezium.connector.postgresql.IncrementalSnapshotIT
    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "kafka").with(PostgresConnectorConfig.READ_ONLY_CONNECTION, true).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()).with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4").with("database.autosave", "conservative");
    }

    @Override // io.debezium.connector.postgresql.IncrementalSnapshotIT
    protected Configuration.Builder mutableConfig(boolean z, boolean z2) {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "kafka").with(PostgresConnectorConfig.READ_ONLY_CONNECTION, true).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()).with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, z ? "s1.b" : "s1.a,s1.b").with("database.autosave", "conservative");
    }

    protected void sendAdHocSnapshotSignal(String... strArr) {
        try {
            sendKafkaSignal(String.format("{\"id\":\"ad-hoc\", \"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", String.join("\",\"", strArr)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to send signal", e);
        }
    }

    protected void sendAdHocSnapshotStopSignal(String... strArr) {
        try {
            sendKafkaSignal(strArr.length > 0 ? String.format("{\"type\":\"stop-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", String.join(",", strArr)) : "{\"type\":\"stop-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
        } catch (Exception e) {
            throw new RuntimeException("Failed to send signal", e);
        }
    }

    protected void sendPauseSignal() {
        try {
            sendKafkaSignal("{\"type\":\"pause-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
        } catch (Exception e) {
            throw new RuntimeException("Failed to send resume signal", e);
        }
    }

    protected void sendResumeSignal() {
        try {
            sendKafkaSignal("{\"type\":\"resume-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
        } catch (Exception e) {
            throw new RuntimeException("Failed to send pause signal", e);
        }
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String str, String str2, AbstractSnapshotSignal.SnapshotType snapshotType, String... strArr) {
        String str3 = (String) Arrays.stream(strArr).map(str4 -> {
            return "\"" + str4 + "\"";
        }).collect(Collectors.joining(", "));
        String format = (Strings.isNullOrEmpty(str) || Strings.isNullOrEmpty(str2)) ? !Strings.isNullOrEmpty(str) ? String.format("{\"type\":\"execute-snapshot\",\"data\": {\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s]}}", snapshotType.toString(), str3, str) : !Strings.isNullOrEmpty(str2) ? String.format("{\"type\":\"execute-snapshot\",\"data\": {\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}} `", snapshotType.toString(), str3, str2) : String.format("{\"type\":\"execute-snapshot\",\"data\": {\"type\": \"%s\",\"data-collections\": [%s]}}", snapshotType.toString(), str3) : String.format("{\"type\":\"execute-snapshot\",\"data\": {\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s], \"surrogate-key\": %s}}", snapshotType.toString(), str3, str, str2);
        try {
            this.logger.info("Sending signal with message {}", format);
            sendKafkaSignal(format);
        } catch (Exception e) {
            throw new RuntimeException("Failed to send signal", e);
        }
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> map, String str, AbstractSnapshotSignal.SnapshotType snapshotType, String... strArr) {
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(buildAdditionalConditions(map), str, snapshotType, strArr);
    }

    protected Callable<Boolean> executeSignalWaiter() {
        return () -> {
            return Boolean.valueOf(this.executeSignalInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections"));
        };
    }

    protected Callable<Boolean> stopSignalWaiter() {
        return () -> {
            return Boolean.valueOf(this.stopSignalInterceptor.containsMessage("Requested stop of snapshot"));
        };
    }

    @Test
    public void insertInsertWatermarkingStrategy() throws Exception {
    }

    @Test
    public void insertDeleteWatermarkingStrategy() throws Exception {
    }

    @Override // io.debezium.connector.postgresql.IncrementalSnapshotIT
    protected void assertExpectedRecordsEnumPk(List<String> list) throws InterruptedException {
        waitForAvailableRecords(5000L, TimeUnit.SECONDS);
        List allRecordsInOrder = consumeRecordsByTopic(list.size()).allRecordsInOrder();
        for (int i = 0; i < list.size(); i++) {
            SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(i);
            Assertions.assertThat(((Struct) sourceRecord.key()).getString("pk")).isEqualTo(list.get(i));
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("aa")).isEqualTo(i);
        }
    }

    protected Function<Configuration.Builder, Configuration.Builder> additionalConfiguration() {
        return builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1).with("heartbeat.interval.ms", 5000);
        };
    }
}
