package org.apache.paimon.flink.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.log.LogSourceProvider;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogSourceProvider.class */
public class KafkaLogSourceProvider implements LogSourceProvider {
    private static final long serialVersionUID = 1;
    private final String topic;
    private final Properties properties;
    private final DataType physicalType;
    private final int[] primaryKey;

    @Nullable
    private final DeserializationSchema<RowData> primaryKeyDeserializer;
    private final DeserializationSchema<RowData> valueDeserializer;

    @Nullable
    private final int[][] projectFields;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.StartupMode scanMode;

    @Nullable
    private final Long timestampMills;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.flink.kafka.KafkaLogSourceProvider$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogSourceProvider$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$paimon$CoreOptions$LogConsistency;
        static final /* synthetic */ int[] $SwitchMap$org$apache$paimon$CoreOptions$StartupMode = new int[CoreOptions.StartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$paimon$CoreOptions$StartupMode[CoreOptions.StartupMode.LATEST_FULL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$paimon$CoreOptions$StartupMode[CoreOptions.StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$paimon$CoreOptions$StartupMode[CoreOptions.StartupMode.FROM_TIMESTAMP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$paimon$CoreOptions$LogConsistency = new int[CoreOptions.LogConsistency.values().length];
            try {
                $SwitchMap$org$apache$paimon$CoreOptions$LogConsistency[CoreOptions.LogConsistency.TRANSACTIONAL.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$paimon$CoreOptions$LogConsistency[CoreOptions.LogConsistency.EVENTUAL.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public KafkaLogSourceProvider(String str, Properties properties, DataType dataType, int[] iArr, @Nullable DeserializationSchema<RowData> deserializationSchema, DeserializationSchema<RowData> deserializationSchema2, @Nullable int[][] iArr2, CoreOptions.LogConsistency logConsistency, CoreOptions.StartupMode startupMode, @Nullable Long l) {
        this.topic = str;
        this.properties = properties;
        this.physicalType = dataType;
        this.primaryKey = iArr;
        this.primaryKeyDeserializer = deserializationSchema;
        this.valueDeserializer = deserializationSchema2;
        this.projectFields = iArr2;
        this.consistency = logConsistency;
        this.scanMode = startupMode;
        this.timestampMills = l;
    }

    public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long> map) {
        switch (AnonymousClass1.$SwitchMap$org$apache$paimon$CoreOptions$LogConsistency[this.consistency.ordinal()]) {
            case 1:
                this.properties.setProperty("isolation.level", "read_committed");
                break;
            case 2:
                this.properties.setProperty("isolation.level", "read_uncommitted");
                break;
        }
        return KafkaSource.builder().setTopics(new String[]{this.topic}).setStartingOffsets(toOffsetsInitializer(map)).setProperties(this.properties).setDeserializer(createDeserializationSchema()).setGroupId(UUID.randomUUID().toString()).build();
    }

    @VisibleForTesting
    KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
        return KafkaRecordDeserializationSchema.of(new KafkaLogDeserializationSchema(this.physicalType, this.primaryKey, this.primaryKeyDeserializer, this.valueDeserializer, this.projectFields));
    }

    private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> map) {
        switch (AnonymousClass1.$SwitchMap$org$apache$paimon$CoreOptions$StartupMode[this.scanMode.ordinal()]) {
            case 1:
                return map == null ? OffsetsInitializer.earliest() : OffsetsInitializer.offsets(toKafkaOffsets(map));
            case 2:
                return OffsetsInitializer.latest();
            case 3:
                if (this.timestampMills == null) {
                    throw new NullPointerException("Must specify a timestamp if you choose timestamp startup mode.");
                }
                return OffsetsInitializer.timestamp(this.timestampMills.longValue());
            default:
                throw new UnsupportedOperationException("Unsupported mode: " + this.scanMode);
        }
    }

    private Map<TopicPartition, Long> toKafkaOffsets(Map<Integer, Long> map) {
        HashMap hashMap = new HashMap();
        map.forEach((num, l) -> {
        });
        return hashMap;
    }

    /* renamed from: createSource, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Source m26createSource(@Nullable Map map) {
        return createSource((Map<Integer, Long>) map);
    }
}
