/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.kafka.connector;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.constant.FieldFormart;
import net.wicp.tams.common.constant.ods.AddColName;
import net.wicp.tams.common.exception.ExceptAll;
import net.wicp.tams.common.exception.IExcept;
import net.wicp.tams.common.exception.ProjectExceptionRuntime;
import net.wicp.tams.common.flink.common.constant.EnvName;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.connector.kafka.KafkaOptions;
import net.wicp.tams.common.flink.connector.kafka.connector.KafkaConsumerListerWrap;
import net.wicp.tams.common.kafka.KafkaAdmin;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.Preconditions;

public class KafkaScanFunction
extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData>,
CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private final Configuration optionsWith;
    private final List<RowType.RowField> rowTypeFields;
    private final List<String> keys;
    private final String groupId;
    private final List<String> addColNames = new ArrayList<String>();
    private transient FieldFormart fieldFormart;
    private final DeserializationSchema<RowData> deserializationSchema;

    public KafkaScanFunction(Configuration optionsWith, List<RowType.RowField> rowTypeFields, UniqueConstraint primaryKey, String tableName, DeserializationSchema<RowData> deserializationSchema) {
        this.optionsWith = optionsWith;
        this.rowTypeFields = rowTypeFields;
        this.deserializationSchema = deserializationSchema;
        this.keys = primaryKey == null ? null : primaryKey.getColumns();
        this.fieldFormart = FieldFormart.ori;
        String groupid = EnvName.tamskafkagroupid.getValue();
        this.groupId = StringUtil.hasNull((String[])new String[]{groupid, tableName});
        List allColList = AddColName.getAllColNameTrue((FieldFormart)this.fieldFormart);
        for (RowType.RowField rowField : rowTypeFields) {
            FlinkTypeEnum flinkTypeEnum;
            if (allColList.contains(rowField.getName())) {
                this.addColNames.add(rowField.getName());
            }
            if ((flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField.getType().getTypeRoot().toString())) != null) continue;
            throw new ProjectExceptionRuntime((IExcept)ExceptAll.param_notfit, "\u5217\uff1a\u3010" + rowField.getName() + "\u3011\u4e0d\u652f\u6301\u7684\u7c7b\u578b\u3010" + rowField.getType().getTypeRoot().toString() + "\u3011");
        }
    }

    public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
        KafkaConsumerListerWrap lister = new KafkaConsumerListerWrap(this.groupId, this.optionsWith.getString(KafkaOptions.topic), this.deserializationSchema, ctx);
        lister.start();
    }

    public static int[] createValueFormatProjection(DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((physicalType.getTypeRoot() == LogicalTypeRoot.ROW ? 1 : 0) != 0, (Object)"Row data type expected.");
        int physicalFieldCount = LogicalTypeChecks.getFieldCount((LogicalType)physicalType);
        IntStream physicalFields = IntStream.range(0, physicalFieldCount);
        return physicalFields.toArray();
    }

    public void cancel() {
    }

    public TypeInformation<RowData> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        KafkaOptions.packageOptionsSource((ReadableConfig)this.optionsWith);
        KafkaAdmin admin = new KafkaAdmin(this.optionsWith.getString(KafkaOptions.kafkaservice));
        admin.initGroupId(this.optionsWith.getString(KafkaOptions.topic), this.groupId);
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }
}

