/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.kafka;

import java.util.HashSet;
import java.util.Set;
import net.wicp.tams.common.constant.Middleware;
import net.wicp.tams.common.flink.common.CommonOptions;
import net.wicp.tams.common.flink.connector.kafka.KafkaOptions;
import net.wicp.tams.common.flink.connector.kafka.sink.KafkaDynamicTableSink;
import net.wicp.tams.common.flink.connector.kafka.source.KafkaDynamicTableSource;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;

public class KafkaDynamicTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public String factoryIdentifier() {
        return Middleware.tamskafka.getDesc();
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(KafkaOptions.kafkaservice);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(KafkaOptions.topic);
        options.add(FactoryUtil.FORMAT);
        options.add(CommonOptions.rowKindColName);
        options.add(KafkaOptions.offset);
        return options;
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        Configuration options = (Configuration)helper.getOptions();
        KafkaOptions.packageOptionsSink((ReadableConfig)options);
        if (!options.getOptional(FactoryUtil.FORMAT).isPresent()) {
            options.set(FactoryUtil.FORMAT, (Object)"duckula");
        }
        EncodingFormat encodingFormat = helper.discoverEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT);
        return new KafkaDynamicTableSink(schema, (EncodingFormat<SerializationSchema<RowData>>)encodingFormat, options, context.getCatalogTable().getPartitionKeys());
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        Configuration options = (Configuration)helper.getOptions();
        KafkaOptions.packageOptionsSource((ReadableConfig)options);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        String tableName = context.getObjectIdentifier().getObjectName();
        DecodingFormat discoverDecodingFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT);
        return new KafkaDynamicTableSource(schema, (DecodingFormat<DeserializationSchema<RowData>>)discoverDecodingFormat, options, tableName);
    }
}

