package org.apache.paimon.flink.action.cdc.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.paimon.shade.org.apache.commons.lang3.BooleanUtils;
import org.apache.paimon.utils.StringUtils;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.class */
public class KafkaActionUtils {
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode = new int[KafkaConnectorOptions.ScanStartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[KafkaConnectorOptions.ScanStartupMode.TIMESTAMP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode = new int[StartupMode.values().length];
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.GROUP_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.SPECIFIC_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.TIMESTAMP.ordinal()] = 5;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils$KafkaConsumerWrapper.class */
    private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String topic;

        KafkaConsumerWrapper(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
            this.consumer = kafkaConsumer;
            this.topic = str;
        }

        @Override // org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.ConsumerWrapper
        public List<CdcSourceRecord> getRecords(int i) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(i));
            CdcJsonDeserializationSchema cdcJsonDeserializationSchema = new CdcJsonDeserializationSchema();
            return (List) StreamSupport.stream(poll.records(this.topic).spliterator(), false).map(consumerRecord -> {
                try {
                    return cdcJsonDeserializationSchema.m208deserialize((byte[]) consumerRecord.value());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList());
        }

        @Override // org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.ConsumerWrapper
        public String topic() {
            return this.topic;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.consumer.close();
        }
    }

    public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration configuration) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        if (configuration.contains(KafkaConnectorOptions.TOPIC)) {
            builder.setTopics((List) ((List) configuration.get(KafkaConnectorOptions.TOPIC)).stream().flatMap(str -> {
                return Arrays.stream(str.split(FieldListaggAgg.DELIMITER));
            }).collect(Collectors.toList()));
        } else {
            builder.setTopicPattern(Pattern.compile((String) configuration.get(KafkaConnectorOptions.TOPIC_PATTERN)));
        }
        builder.setValueOnlyDeserializer(new CdcJsonDeserializationSchema()).setGroupId(kafkaPropertiesGroupId(configuration));
        Properties createKafkaProperties = createKafkaProperties(configuration);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[fromOption((KafkaConnectorOptions.ScanStartupMode) configuration.get(KafkaConnectorOptions.SCAN_STARTUP_MODE)).ordinal()]) {
            case 1:
                builder.setStartingOffsets(OffsetsInitializer.earliest());
                break;
            case 2:
                builder.setStartingOffsets(OffsetsInitializer.latest());
                break;
            case 3:
                builder.setStartingOffsets(OffsetsInitializer.committedOffsets(getResetStrategy(createKafkaProperties.getProperty("auto.offset.reset", OffsetResetStrategy.NONE.name()))));
                break;
            case 4:
                HashMap hashMap = new HashMap();
                String str2 = (String) ((List) configuration.get(KafkaConnectorOptions.TOPIC)).get(0);
                parseSpecificOffsets((String) configuration.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS), KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key()).forEach((num, l) -> {
                    hashMap.put(new TopicPartition(str2, num.intValue()), l);
                });
                builder.setStartingOffsets(OffsetsInitializer.offsets(hashMap));
                break;
            case 5:
                builder.setStartingOffsets(OffsetsInitializer.timestamp(((Long) configuration.get(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)).longValue()));
                break;
        }
        builder.setProperties(createKafkaProperties);
        return builder.build();
    }

    private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$table$KafkaConnectorOptions$ScanStartupMode[scanStartupMode.ordinal()]) {
            case 1:
                return StartupMode.EARLIEST;
            case 2:
                return StartupMode.LATEST;
            case 3:
                return StartupMode.GROUP_OFFSETS;
            case 4:
                return StartupMode.SPECIFIC_OFFSETS;
            case 5:
                return StartupMode.TIMESTAMP;
            default:
                throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }
    }

    private static OffsetResetStrategy getResetStrategy(String str) {
        return (OffsetResetStrategy) Arrays.stream(OffsetResetStrategy.values()).filter(offsetResetStrategy -> {
            return offsetResetStrategy.name().equals(str.toUpperCase(Locale.ROOT));
        }).findAny().orElseThrow(() -> {
            return new IllegalArgumentException(String.format("%s can not be set to %s. Valid values: [%s]", "auto.offset.reset", str, Arrays.stream(OffsetResetStrategy.values()).map((v0) -> {
                return v0.name();
            }).map((v0) -> {
                return v0.toLowerCase();
            }).collect(Collectors.joining(FieldListaggAgg.DELIMITER))));
        });
    }

    private static Map<Integer, Long> parseSpecificOffsets(String str, String str2) {
        HashMap hashMap = new HashMap();
        String[] split = str.split(CsvSchema.DEFAULT_ARRAY_ELEMENT_SEPARATOR);
        String format = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", str2, str);
        if (split.length == 0) {
            throw new ValidationException(format);
        }
        for (String str3 : split) {
            if (null == str3 || !str3.contains(FieldListaggAgg.DELIMITER)) {
                throw new ValidationException(format);
            }
            String[] split2 = str3.split(FieldListaggAgg.DELIMITER);
            if (split2.length != 2 || !split2[0].startsWith("partition:") || !split2[1].startsWith("offset:")) {
                throw new ValidationException(format);
            }
            try {
                hashMap.put(Integer.valueOf(split2[0].substring(split2[0].indexOf(":") + 1)), Long.valueOf(split2[1].substring(split2[1].indexOf(":") + 1)));
            } catch (NumberFormatException e) {
                throw new ValidationException(format, e);
            }
        }
        return hashMap;
    }

    private static String kafkaPropertiesGroupId(Configuration configuration) {
        String str = (String) configuration.get(KafkaConnectorOptions.PROPS_GROUP_ID);
        if (StringUtils.isEmpty(str)) {
            str = UUID.randomUUID().toString();
            configuration.set(KafkaConnectorOptions.PROPS_GROUP_ID, str);
        }
        return str;
    }

    public static DataFormat getDataFormat(Configuration configuration) {
        return DataFormat.fromConfigString((String) configuration.get(KafkaConnectorOptions.VALUE_FORMAT));
    }

    public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(Configuration configuration) {
        Properties createKafkaProperties = createKafkaProperties(configuration);
        createKafkaProperties.put("bootstrap.servers", configuration.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
        createKafkaProperties.put("group.id", kafkaPropertiesGroupId(configuration));
        createKafkaProperties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        createKafkaProperties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        createKafkaProperties.put("auto.offset.reset", "earliest");
        createKafkaProperties.put("enable.auto.commit", BooleanUtils.FALSE);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(createKafkaProperties);
        String findOneTopic = configuration.contains(KafkaConnectorOptions.TOPIC) ? (String) ((List) configuration.get(KafkaConnectorOptions.TOPIC)).get(0) : findOneTopic(createKafkaProperties, (String) configuration.get(KafkaConnectorOptions.TOPIC_PATTERN));
        List partitionsFor = kafkaConsumer.partitionsFor(findOneTopic);
        if (partitionsFor == null || partitionsFor.isEmpty()) {
            throw new IllegalArgumentException(String.format("Failed to find partition information for topic '%s'. Please check your 'topic' and 'bootstrap.servers' config.", findOneTopic));
        }
        List singletonList = Collections.singletonList(new TopicPartition(findOneTopic, ((Integer) partitionsFor.stream().map((v0) -> {
            return v0.partition();
        }).sorted().findFirst().get()).intValue()));
        kafkaConsumer.assign(singletonList);
        kafkaConsumer.seekToBeginning(singletonList);
        return new KafkaConsumerWrapper(kafkaConsumer, findOneTopic);
    }

    private static Properties createKafkaProperties(Configuration configuration) {
        Properties properties = new Properties();
        properties.putAll(OptionsUtils.convertToPropertiesPrefixKey(configuration.toMap(), PROPERTIES_PREFIX));
        return properties;
    }

    private static String findOneTopic(Properties properties, String str) {
        Pattern compile = Pattern.compile(str);
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                try {
                    for (String str2 : (Set) create.listTopics().names().get()) {
                        if (compile.matcher(str2).matches()) {
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            return str2;
                        }
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            create.close();
                        }
                    }
                    throw new RuntimeException("Cannot find topics match the topic-pattern " + str);
                } finally {
                }
            } catch (Throwable th4) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th4;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
