/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.AvroTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;

public abstract class KafkaAvroTableSource
extends KafkaTableSource {
    KafkaAvroTableSource(String topic, Properties properties, Class<? extends SpecificRecordBase> avroClass) {
        super(topic, properties, (DeserializationSchema<Row>)KafkaAvroTableSource.createDeserializationSchema(avroClass), KafkaAvroTableSource.convertToRowTypeInformation(avroClass));
    }

    private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
        return new AvroRowDeserializationSchema(record);
    }

    private static <T extends SpecificRecordBase> TypeInformation<Row> convertToRowTypeInformation(Class<T> avroClass) {
        AvroTypeInfo avroTypeInfo = new AvroTypeInfo(avroClass);
        Schema schema = SpecificData.get().getSchema(avroClass);
        return KafkaAvroTableSource.convertToTypeInformation(avroTypeInfo, schema);
    }

    private static TypeInformation<?> convertToTypeInformation(TypeInformation<?> extracted, Schema schema) {
        GenericTypeInfo genericTypeInfo;
        if (schema.getType() == Schema.Type.RECORD) {
            List fields = schema.getFields();
            AvroTypeInfo avroTypeInfo = (AvroTypeInfo)extracted;
            TypeInformation[] types = new TypeInformation[fields.size()];
            String[] names = new String[fields.size()];
            for (int i = 0; i < fields.size(); ++i) {
                Schema.Field field = (Schema.Field)fields.get(i);
                types[i] = KafkaAvroTableSource.convertToTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema());
                names[i] = field.name();
            }
            return new RowTypeInfo(types, names);
        }
        if (extracted instanceof GenericTypeInfo && (genericTypeInfo = (GenericTypeInfo)extracted).getTypeClass() == Utf8.class) {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
        return extracted;
    }
}

