package org.apache.paimon.flink.action.cdc.watermark;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.utils.JsonSerdeUtil;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.class */
public class CdcTimestampExtractorFactory implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap = new HashMap();

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory$CdcTimestampExtractor.class */
    public interface CdcTimestampExtractor extends Serializable {
        long extractTimestamp(String str) throws JsonProcessingException;
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory$MessageQueueCdcTimestampExtractor.class */
    public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {
        private static final long serialVersionUID = 1;

        /* JADX WARN: Type inference failed for: r0v32, types: [java.time.ZonedDateTime] */
        @Override // org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor
        public long extractTimestamp(String str) throws JsonProcessingException {
            if (JsonSerdeUtil.isNodeExists(str, new String[]{"mysqlType"})) {
                return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"ts"})).longValue();
            }
            if (JsonSerdeUtil.isNodeExists(str, new String[]{"pos"})) {
                return LocalDateTime.parse((String) JsonSerdeUtil.extractValue(str, String.class, new String[]{"op_ts"}), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            }
            if (JsonSerdeUtil.isNodeExists(str, new String[]{"xid"})) {
                return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"ts"})).longValue() * 1000;
            }
            if (JsonSerdeUtil.isNodeExists(str, new String[]{"payload", "source", "connector"})) {
                return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"payload", "ts_ms"})).longValue();
            }
            if (JsonSerdeUtil.isNodeExists(str, new String[]{"source", "connector"})) {
                return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"ts_ms"})).longValue();
            }
            throw new RuntimeException(String.format("Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", str));
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory$MongoDBCdcTimestampExtractor.class */
    public static class MongoDBCdcTimestampExtractor implements CdcTimestampExtractor {
        private static final long serialVersionUID = 1;

        @Override // org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor
        public long extractTimestamp(String str) throws JsonProcessingException {
            return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"ts_ms"})).longValue();
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory$MysqlCdcTimestampExtractor.class */
    public static class MysqlCdcTimestampExtractor implements CdcTimestampExtractor {
        @Override // org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor
        public long extractTimestamp(String str) throws JsonProcessingException {
            return ((Long) JsonSerdeUtil.extractValue(str, Long.class, new String[]{"payload", "ts_ms"})).longValue();
        }
    }

    public static CdcTimestampExtractor createExtractor(Object obj) {
        Supplier<CdcTimestampExtractor> supplier = extractorMap.get(obj.getClass());
        if (supplier != null) {
            return supplier.get();
        }
        throw new IllegalArgumentException("Unsupported source type: " + obj.getClass().getName());
    }

    static {
        extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
        extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
        extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
        extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
    }
}
