/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Preconditions;

abstract class KafkaTableSource
implements StreamTableSource<Row> {
    private final String topic;
    private final Properties properties;
    private final DeserializationSchema<Row> deserializationSchema;
    private final String[] fieldNames;
    private final TypeInformation<?>[] fieldTypes;

    KafkaTableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, String[] fieldNames, Class<?>[] fieldTypes) {
        this(topic, properties, deserializationSchema, fieldNames, KafkaTableSource.toTypeInfo(fieldTypes));
    }

    KafkaTableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        this.topic = (String)Preconditions.checkNotNull((Object)topic, (String)"Topic");
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties, (String)"Properties");
        this.deserializationSchema = (DeserializationSchema)Preconditions.checkNotNull(deserializationSchema, (String)"Deserialization schema");
        this.fieldNames = (String[])Preconditions.checkNotNull((Object)fieldNames, (String)"Field names");
        this.fieldTypes = (TypeInformation[])Preconditions.checkNotNull(fieldTypes, (String)"Field types");
        Preconditions.checkArgument((fieldNames.length == fieldTypes.length ? 1 : 0) != 0, (Object)"Number of provided field names and types does not match.");
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
        FlinkKafkaConsumerBase<Row> kafkaConsumer = this.getKafkaConsumer(this.topic, this.properties, this.deserializationSchema);
        DataStreamSource kafkaSource = env.addSource(kafkaConsumer);
        return kafkaSource;
    }

    public int getNumberOfFields() {
        return this.fieldNames.length;
    }

    public String[] getFieldsNames() {
        return this.fieldNames;
    }

    public TypeInformation<?>[] getFieldTypes() {
        return this.fieldTypes;
    }

    public TypeInformation<Row> getReturnType() {
        return new RowTypeInfo(this.fieldTypes);
    }

    abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(String var1, Properties var2, DeserializationSchema<Row> var3);

    protected DeserializationSchema<Row> getDeserializationSchema() {
        return this.deserializationSchema;
    }

    private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
        TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length];
        for (int i = 0; i < fieldTypes.length; ++i) {
            typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
        }
        return typeInfos;
    }
}

