package org.apache.paimon.flink.action.cdc.kafka;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.utils.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.class */
public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
    private static final int zkTimeoutMills = 30000;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaActionITCaseBase.class);
    private static final Network NETWORK = Network.newNetwork();
    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";

    @RegisterExtension
    public static final KafkaContainerExtension KAFKA_CONTAINER = new KafkaContainerExtension(DockerImageName.parse("confluentinc/cp-kafka:7.2.2")) { // from class: org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase.1
        protected void doStart() {
            super.doStart();
            if (KafkaActionITCaseBase.LOG.isInfoEnabled()) {
                followOutput(new Slf4jLogConsumer(KafkaActionITCaseBase.LOG));
            }
        }
    }.withEmbeddedZookeeper().withNetwork(NETWORK).withNetworkAliases(new String[]{INTER_CONTAINER_KAFKA_ALIAS}).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2).toMillis())).withEnv("KAFKA_LOG_RETENTION_MS", "-1");
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Timer loggingTimer = new Timer("Debug Logging Timer");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase$KafkaContainerExtension.class */
    public static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback {
        private KafkaContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) {
            doStart();
        }

        public void afterAll(ExtensionContext extensionContext) {
            close();
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase$KafkaSyncDatabaseActionBuilder.class */
    protected class KafkaSyncDatabaseActionBuilder extends CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> {
        public KafkaSyncDatabaseActionBuilder(Map<String, String> map) {
            super(map);
        }

        @Override // org.apache.paimon.flink.action.cdc.CdcActionITCaseBase.SyncDatabaseActionBuilder
        /* renamed from: ignoreIncompatible */
        public CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> ignoreIncompatible2(boolean z) {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.paimon.flink.action.cdc.CdcActionITCaseBase.SyncDatabaseActionBuilder
        /* renamed from: mergeShards */
        public CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> mergeShards2(boolean z) {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.paimon.flink.action.cdc.CdcActionITCaseBase.SyncDatabaseActionBuilder
        /* renamed from: withMode */
        public CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> withMode2(String str) {
            throw new UnsupportedOperationException();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.paimon.flink.action.cdc.CdcActionITCaseBase.SyncDatabaseActionBuilder
        public KafkaSyncDatabaseAction build() {
            ArrayList arrayList = new ArrayList(Arrays.asList("kafka_sync_database", "--warehouse", KafkaActionITCaseBase.this.warehouse, "--database", KafkaActionITCaseBase.this.database));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--kafka-conf", this.sourceConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.nullableToArgs("--table-prefix", this.tablePrefix));
            arrayList.addAll(KafkaActionITCaseBase.this.nullableToArgs("--table-suffix", this.tableSuffix));
            arrayList.addAll(KafkaActionITCaseBase.this.nullableToArgs("--including-tables", this.includingTables));
            arrayList.addAll(KafkaActionITCaseBase.this.nullableToArgs("--excluding-tables", this.excludingTables));
            arrayList.addAll(KafkaActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            return KafkaActionITCaseBase.this.createAction(KafkaSyncDatabaseAction.class, arrayList);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase$KafkaSyncTableActionBuilder.class */
    protected class KafkaSyncTableActionBuilder extends CdcActionITCaseBase.SyncTableActionBuilder<KafkaSyncTableAction> {
        public KafkaSyncTableActionBuilder(Map<String, String> map) {
            super(map);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.paimon.flink.action.cdc.CdcActionITCaseBase.SyncTableActionBuilder
        public KafkaSyncTableAction build() {
            ArrayList arrayList = new ArrayList(Arrays.asList("kafka_sync_table", "--warehouse", KafkaActionITCaseBase.this.warehouse, "--database", KafkaActionITCaseBase.this.database, "--table", KafkaActionITCaseBase.this.tableName));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--kafka-conf", this.sourceConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            arrayList.addAll(KafkaActionITCaseBase.this.listToArgs("--partition-keys", this.partitionKeys));
            arrayList.addAll(KafkaActionITCaseBase.this.listToArgs("--primary-keys", this.primaryKeys));
            arrayList.addAll(KafkaActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            arrayList.addAll(KafkaActionITCaseBase.this.listToMultiArgs("--computed-column", this.computedColumnArgs));
            return KafkaActionITCaseBase.this.createAction(KafkaSyncTableAction.class, arrayList);
        }
    }

    @BeforeEach
    public void setup() {
        scheduleTimeoutLogger(Duration.ofSeconds(30L), () -> {
            Map<String, TopicDescription> describeExternalTopics = describeExternalTopics();
            LOG.info("Current existing topics: {}", describeExternalTopics.keySet());
            logTopicPartitionStatus(describeExternalTopics);
        });
    }

    @AfterEach
    public void after() throws Exception {
        super.after();
        cancelTimeoutLogger();
        deleteTopics();
    }

    private void deleteTopics() throws ExecutionException, InterruptedException {
        AdminClient create = AdminClient.create(getStandardProps());
        create.deleteTopics((Collection) create.listTopics().names().get()).all().get();
    }

    private void scheduleTimeoutLogger(Duration duration, final Runnable runnable) {
        this.loggingTimer.schedule(new TimerTask() { // from class: org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to execute logging action", e);
                }
            }
        }, 0L, duration.toMillis());
    }

    private void cancelTimeoutLogger() {
        this.loggingTimer.cancel();
    }

    private Map<String, TopicDescription> describeExternalTopics() {
        try {
            AdminClient create = AdminClient.create(getStandardProps());
            Throwable th = null;
            try {
                Map<String, TopicDescription> map = (Map) create.describeTopics((List) ((Collection) create.listTopics().listings().get()).stream().filter(topicListing -> {
                    return !topicListing.isInternal();
                }).map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList())).all().get();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return map;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to list Kafka topics", e);
        }
    }

    private void logTopicPartitionStatus(Map<String, TopicDescription> map) {
        Properties standardProps = getStandardProps();
        standardProps.setProperty("group.id", "flink-tests-debugging");
        standardProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        standardProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(standardProps);
        ArrayList arrayList = new ArrayList();
        map.forEach((str, topicDescription) -> {
            topicDescription.partitions().forEach(topicPartitionInfo -> {
                arrayList.add(new TopicPartition(str, topicPartitionInfo.partition()));
            });
        });
        Map beginningOffsets = kafkaConsumer.beginningOffsets(arrayList);
        Map endOffsets = kafkaConsumer.endOffsets(arrayList);
        arrayList.forEach(topicPartition -> {
            LOG.info("TopicPartition \"{}\": starting offset: {}, stopping offset: {}", new Object[]{topicPartition, beginningOffsets.get(topicPartition), endOffsets.get(topicPartition)});
        });
    }

    public static Properties getStandardProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        properties.put("group.id", "paimon-tests");
        properties.put("enable.auto.commit", false);
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.partition.fetch.bytes", 256);
        properties.put("zookeeper.session.timeout.ms", Integer.valueOf(zkTimeoutMills));
        properties.put("zookeeper.connection.timeout.ms", Integer.valueOf(zkTimeoutMills));
        properties.put("default.api.timeout.ms", "120000");
        return properties;
    }

    public String getBootstrapServers() {
        return KAFKA_CONTAINER.getBootstrapServers();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getBasicKafkaConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("properties.bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        hashMap.put("properties.group.id", "paimon-tests");
        hashMap.put("properties.enable.auto.commit", "false");
        hashMap.put("properties.auto.offset.reset", "earliest");
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSyncTableActionBuilder syncTableActionBuilder(Map<String, String> map) {
        return new KafkaSyncTableActionBuilder(map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(Map<String, String> map) {
        return new KafkaSyncDatabaseActionBuilder(map);
    }

    public void createTestTopic(String str, int i, int i2) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", getBootstrapServers());
        try {
            AdminClient create = AdminClient.create(hashMap);
            Throwable th = null;
            try {
                try {
                    create.createTopics(Collections.singletonList(new NewTopic(str, i, (short) i2))).all().get();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Fail to create topic [%s partitions: %d replication factor: %d].", str, Integer.valueOf(i), Integer.valueOf(i2)), e);
        }
    }

    public static List<String> readLines(String str) throws IOException {
        URL resource = KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(str);
        Assertions.assertThat(resource).isNotNull();
        return Files.readAllLines(new File(resource.getFile()).toPath());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeRecordsToKafka(String str, List<String> list) throws Exception {
        Properties standardProps = getStandardProps();
        standardProps.setProperty("retries", "0");
        standardProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        standardProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(standardProps);
        for (int i = 0; i < list.size(); i++) {
            try {
                this.objectMapper.readTree(list.get(i));
                if (!StringUtils.isEmpty(list.get(i))) {
                    kafkaProducer.send(new ProducerRecord(str, list.get(i)));
                }
            } catch (Exception e) {
            }
        }
        kafkaProducer.close();
    }
}
