package org.apache.gobblin.source.extractor.extract.kafka;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.class */
public class KafkaSimpleStreamingSource<S, D> extends EventBasedSource<S, RecordEnvelope<D>> {
    public static final String TOPIC_WHITELIST = "gobblin.streaming.kafka.topic.singleton";
    public static final String TOPIC_KEY_DESERIALIZER = "gobblin.streaming.kafka.topic.key.deserializer";
    public static final String TOPIC_VALUE_DESERIALIZER = "gobblin.streaming.kafka.topic.value.deserializer";
    public static final String KAFKA_CONSUMER_CONFIG_PREFIX = "gobblin.streaming.kafka.consumerConfig";
    private static final String TOPIC_NAME = "topic.name";
    private static final String PARTITION_ID = "partition.id";
    private final Closer closer = Closer.create();
    public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSimpleStreamingSource.class);
    public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;

    public static String getTopicNameFromState(State state) {
        return state.getProp(TOPIC_NAME);
    }

    public static int getPartitionIdFromState(State state) {
        return state.getPropAsInt(PARTITION_ID);
    }

    public static void setTopicNameInState(State state, String str) {
        state.setProp(TOPIC_NAME, str);
    }

    public static void setPartitionId(State state, int i) {
        state.setProp(PARTITION_ID, Integer.valueOf(i));
    }

    public static Consumer getKafkaConsumer(Config config) {
        List stringList = ConfigUtils.getStringList(config, "kafka.brokers");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Joiner.on(",").join(stringList));
        properties.put("group.id", ConfigUtils.getString(config, "job.name", ""));
        properties.put("enable.auto.commit", "false");
        Preconditions.checkArgument(config.hasPath(TOPIC_KEY_DESERIALIZER));
        properties.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER));
        Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER));
        properties.put("value.deserializer", config.getString(TOPIC_VALUE_DESERIALIZER));
        properties.putAll(ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(config, KAFKA_CONSUMER_CONFIG_PREFIX)));
        try {
            return new KafkaConsumer(properties);
        } catch (Exception e) {
            LOG.error("Exception when creating Kafka consumer - {}", e);
            throw Throwables.propagate(e);
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        Config propertiesToConfig = ConfigUtils.propertiesToConfig(sourceState.getProperties());
        Consumer kafkaConsumer = getKafkaConsumer(propertiesToConfig);
        LOG.debug("Consumer is {}", kafkaConsumer);
        String string = ConfigUtils.getString(propertiesToConfig, TOPIC_WHITELIST, "");
        ArrayList arrayList = new ArrayList();
        List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(string);
        LOG.info("Partition count is {}", Integer.valueOf(partitionsFor.size()));
        for (PartitionInfo partitionInfo : partitionsFor) {
            Extract createExtract = createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, partitionInfo.topic());
            LOG.info("Partition info is {}", partitionInfo);
            WorkUnit create = WorkUnit.create(createExtract);
            setTopicNameInState(create, partitionInfo.topic());
            create.setProp("extract.table.name", partitionInfo.topic());
            setPartitionId(create, partitionInfo.partition());
            arrayList.add(create);
        }
        return arrayList;
    }

    public Extractor getExtractor(WorkUnitState workUnitState) throws IOException {
        return new KafkaSimpleStreamingExtractor(workUnitState);
    }
}
