package org.apache.seatunnel.flink.kafka.source;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
import org.apache.seatunnel.flink.util.SchemaUtil;
import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({BaseFlinkSource.class})
/* loaded from: input_file:org/apache/seatunnel/flink/kafka/source/KafkaTableStream.class */
public class KafkaTableStream implements FlinkStreamSource {
    private static final long serialVersionUID = 5287018194573371428L;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableStream.class);
    private Config config;
    private String topic;
    private Object schemaInfo;
    private String rowTimeField;
    private String tableName;
    private long watermark;
    private FormatType format;
    private static final String TOPICS = "topics";
    private static final String ROWTIME_FIELD = "rowtime.field";
    private static final String WATERMARK_VAL = "watermark";
    private static final String SCHEMA = "schema";
    private static final String SOURCE_FORMAT = "format.type";
    private static final String GROUP_ID = "group.id";
    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
    private static final String OFFSET_RESET = "offset.reset";
    private static final int DEFAULT_INITIAL_CAPACITY = 16;
    private final Properties kafkaParams = new Properties();
    private final String consumerPrefix = "consumer.";

    public void setConfig(Config config) {
        this.config = config;
    }

    public Config getConfig() {
        return this.config;
    }

    public CheckResult checkConfig() {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(this.config, new String[]{"topics", SCHEMA, SOURCE_FORMAT, "result_table_name"});
        return checkAllExists.isSuccess() ? CheckConfigUtil.checkAllExists(TypesafeConfigUtils.extractSubConfig(this.config, "consumer.", false), new String[]{"bootstrap.servers", "group.id"}) : checkAllExists;
    }

    public void prepare(FlinkEnvironment flinkEnvironment) {
        this.topic = this.config.getString("topics");
        PropertiesUtil.setProperties(this.config, this.kafkaParams, "consumer.", false);
        this.tableName = this.config.getString("result_table_name");
        if (this.config.hasPath(ROWTIME_FIELD)) {
            this.rowTimeField = this.config.getString(ROWTIME_FIELD);
            if (this.config.hasPath(WATERMARK_VAL)) {
                this.watermark = this.config.getLong(WATERMARK_VAL);
            }
        }
        String string = this.config.getString(SCHEMA);
        this.format = FormatType.from(this.config.getString(SOURCE_FORMAT).trim().toLowerCase());
        this.schemaInfo = JsonUtils.parseArray(string);
    }

    public String getPluginName() {
        return "KafkaTableStream";
    }

    /* renamed from: getData, reason: merged with bridge method [inline-methods] */
    public DataStream<Row> m590getData(FlinkEnvironment flinkEnvironment) {
        StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
        streamTableEnvironment.connect(getKafkaConnect()).withFormat(setFormat()).withSchema(getSchema()).inAppendMode().createTemporaryTable(this.tableName);
        return TableUtil.tableToDataStream(streamTableEnvironment, streamTableEnvironment.scan(new String[]{this.tableName}), true);
    }

    private Schema getSchema() {
        Schema schema = new Schema();
        SchemaUtil.setSchema(schema, this.schemaInfo, this.format);
        if (StringUtils.isNotBlank(this.rowTimeField)) {
            Rowtime rowtime = new Rowtime();
            rowtime.timestampsFromField(this.rowTimeField);
            rowtime.watermarksPeriodicBounded(this.watermark);
            schema.rowtime(rowtime);
        }
        return schema;
    }

    private Kafka getKafkaConnect() {
        Kafka version = new Kafka().version(KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL);
        version.topic(this.topic);
        version.properties(this.kafkaParams);
        if (this.config.hasPath(OFFSET_RESET)) {
            String string = this.config.getString(OFFSET_RESET);
            boolean z = -1;
            switch (string.hashCode()) {
                case -2132874958:
                    if (string.equals("specific")) {
                        z = 2;
                        break;
                    }
                    break;
                case -1109880953:
                    if (string.equals("latest")) {
                        z = false;
                        break;
                    }
                    break;
                case -809579181:
                    if (string.equals("earliest")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    version.startFromLatest();
                    break;
                case true:
                    version.startFromEarliest();
                    break;
                case true:
                    version.startFromSpecificOffsets((Map) JsonUtils.parseObject(this.config.getString("offset.reset.specific"), new TypeReference<Map<Integer, Long>>() { // from class: org.apache.seatunnel.flink.kafka.source.KafkaTableStream.1
                    }));
                    break;
            }
        }
        return version;
    }

    private FormatDescriptor setFormat() {
        try {
            return SchemaUtil.setFormat(this.format, this.config);
        } catch (Exception e) {
            LOGGER.warn("set format exception", e);
            throw new RuntimeException("format config error");
        }
    }
}
