package gobblin.source.extractor.extract.kafka;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import gobblin.configuration.SourceState;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.GobblinKafkaConsumerClient;
import gobblin.source.extractor.extract.EventBasedSource;
import gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.MultiWorkUnit;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.ClassAliasResolver;
import gobblin.util.ConfigUtils;
import gobblin.util.DatasetFilterUtils;
import gobblin.util.ExecutorsUtils;
import gobblin.util.dataset.DatasetUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource.class */
public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
    public static final String TOPIC_BLACKLIST = "topic.blacklist";
    public static final String TOPIC_WHITELIST = "topic.whitelist";
    public static final String LATEST_OFFSET = "latest";
    public static final String EARLIEST_OFFSET = "earliest";
    public static final String NEAREST_OFFSET = "nearest";
    public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
    public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = "latest";
    public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
    public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
    public static final String DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE = "nearest";
    public static final String TOPIC_NAME = "topic.name";
    public static final String PARTITION_ID = "partition.id";
    public static final String LEADER_ID = "leader.id";
    public static final String LEADER_HOSTANDPORT = "leader.hostandport";
    public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
    public static final String ALL_TOPICS = "all";
    public static final String AVG_RECORD_SIZE = "avg.record.size";
    public static final String AVG_RECORD_MILLIS = "avg.record.millis";
    public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
    public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION = "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
    public static final String DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.client.Kafka08ConsumerClient$Factory";
    private GobblinKafkaConsumerClient kafkaConsumerClient;
    private Extract.TableType tableType;
    private String extractNameSpace;
    private boolean isFullExtract;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;
    private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
    private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
    private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
    private final AtomicInteger failToGetOffsetCount = new AtomicInteger(0);
    private final AtomicInteger offsetTooEarlyCount = new AtomicInteger(0);
    private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
    private final ClassAliasResolver<GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver = new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
    private volatile boolean doneGettingAllPreviousOffsets = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource$Offsets.class */
    public static class Offsets {
        private long startOffset;
        private long earliestOffset;
        private long latestOffset;

        private Offsets() {
            this.startOffset = 0L;
            this.earliestOffset = 0L;
            this.latestOffset = 0L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAt(long j) throws StartOffsetOutOfRangeException {
            if (j < this.earliestOffset || j > this.latestOffset) {
                throw new StartOffsetOutOfRangeException(String.format("start offset = %d, earliest offset = %d, latest offset = %d", Long.valueOf(j), Long.valueOf(this.earliestOffset), Long.valueOf(this.latestOffset)));
            }
            this.startOffset = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAtEarliestOffset() {
            this.startOffset = this.earliestOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAtLatestOffset() {
            this.startOffset = this.latestOffset;
        }

        public long getStartOffset() {
            return this.startOffset;
        }

        public long getEarliestOffset() {
            return this.earliestOffset;
        }

        public void setEarliestOffset(long j) {
            this.earliestOffset = j;
        }

        public long getLatestOffset() {
            return this.latestOffset;
        }

        public void setLatestOffset(long j) {
            this.latestOffset = j;
        }
    }

    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource$WorkUnitCreator.class */
    private class WorkUnitCreator implements Runnable {
        private final KafkaTopic topic;
        private final SourceState state;
        private final Optional<State> topicSpecificState;
        private final Map<String, List<WorkUnit>> allTopicWorkUnits;

        WorkUnitCreator(KafkaTopic kafkaTopic, SourceState sourceState, Optional<State> optional, Map<String, List<WorkUnit>> map) {
            this.topic = kafkaTopic;
            this.state = sourceState;
            this.topicSpecificState = optional;
            this.allTopicWorkUnits = map;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.allTopicWorkUnits.put(this.topic.getName(), KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
            } catch (Throwable th) {
                KafkaSource.LOG.error("Caught error in creating work unit for " + this.topic.getName(), th);
                throw th;
            }
        }
    }

    private List<String> getLimiterExtractorReportKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(TOPIC_NAME);
        arrayList.add(PARTITION_ID);
        return arrayList;
    }

    private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> list, List<String> list2) {
        if (list2.isEmpty()) {
            return;
        }
        String join = Joiner.on(',').join(list2.iterator());
        Iterator<WorkUnit> it = list.iterator();
        while (it.hasNext()) {
            it.next().setProp("limiter.report.key.list", join);
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        if (sourceState.getPropAsBoolean(GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
            this.tableType = Extract.TableType.valueOf(sourceState.getProp("extract.table.type", DEFAULT_TABLE_TYPE.toString()));
            this.extractNameSpace = sourceState.getProp("extract.namespace", DEFAULT_NAMESPACE_NAME);
        } else {
            this.tableType = DEFAULT_TABLE_TYPE;
            this.extractNameSpace = DEFAULT_NAMESPACE_NAME;
        }
        this.isFullExtract = sourceState.getPropAsBoolean("extract.is.full");
        try {
            try {
                this.kafkaConsumerClient = ((GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory) this.kafkaConsumerClientResolver.resolveClass(sourceState.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance()).create(ConfigUtils.propertiesToConfig(sourceState.getProperties()));
                List<KafkaTopic> filteredTopics = getFilteredTopics(sourceState);
                Iterator<KafkaTopic> it = filteredTopics.iterator();
                while (it.hasNext()) {
                    LOG.info("Discovered topic " + it.next().getName());
                }
                Map<String, State> datasetSpecificProps = DatasetUtils.getDatasetSpecificProps(Iterables.transform(filteredTopics, new Function<KafkaTopic, String>() { // from class: gobblin.source.extractor.extract.kafka.KafkaSource.1
                    public String apply(KafkaTopic kafkaTopic) {
                        return kafkaTopic.getName();
                    }
                }), sourceState);
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(sourceState.getPropAsInt("kafka.source.work.units.creation.threads", 30), ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
                Stopwatch createStarted = Stopwatch.createStarted();
                for (KafkaTopic kafkaTopic : filteredTopics) {
                    newFixedThreadPool.submit(new WorkUnitCreator(kafkaTopic, sourceState, Optional.fromNullable(datasetSpecificProps.get(kafkaTopic.getName())), newConcurrentMap));
                }
                ExecutorsUtils.shutdownExecutorService(newFixedThreadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
                LOG.info(String.format("Created workunits for %d topics in %d seconds", Integer.valueOf(newConcurrentMap.size()), Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS))));
                createEmptyWorkUnitsForSkippedPartitions(newConcurrentMap, datasetSpecificProps, sourceState);
                List<WorkUnit> pack = KafkaWorkUnitPacker.getInstance(this, sourceState).pack(newConcurrentMap, sourceState.getPropAsInt("mr.job.max.mappers", 100));
                addTopicSpecificPropsToWorkUnits(pack, datasetSpecificProps);
                setLimiterReportKeyListToWorkUnits(pack, getLimiterExtractorReportKeys());
                try {
                    if (this.kafkaConsumerClient != null) {
                        this.kafkaConsumerClient.close();
                    }
                    return pack;
                } catch (IOException e) {
                    throw new RuntimeException("Exception closing kafkaConsumerClient");
                }
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            try {
                if (this.kafkaConsumerClient != null) {
                    this.kafkaConsumerClient.close();
                }
                throw th;
            } catch (IOException e3) {
                throw new RuntimeException("Exception closing kafkaConsumerClient");
            }
        }
    }

    private void addTopicSpecificPropsToWorkUnits(List<WorkUnit> list, Map<String, State> map) {
        Iterator<WorkUnit> it = list.iterator();
        while (it.hasNext()) {
            addTopicSpecificPropsToWorkUnit(it.next(), map);
        }
    }

    private void addTopicSpecificPropsToWorkUnit(WorkUnit workUnit, Map<String, State> map) {
        if (workUnit instanceof MultiWorkUnit) {
            Iterator it = ((MultiWorkUnit) workUnit).getWorkUnits().iterator();
            while (it.hasNext()) {
                addTopicSpecificPropsToWorkUnit((WorkUnit) it.next(), map);
            }
        } else if (workUnit.contains(TOPIC_NAME) && map != null && map.containsKey(workUnit.getProp(TOPIC_NAME))) {
            workUnit.addAll(map.get(workUnit.getProp(TOPIC_NAME)));
        }
    }

    private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>> map, Map<String, State> map2, SourceState sourceState) {
        getAllPreviousOffsets(sourceState);
        for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
            KafkaPartition key = entry.getKey();
            if (!this.partitionsToBeProcessed.contains(key)) {
                WorkUnit createEmptyWorkUnit = createEmptyWorkUnit(key, entry.getValue().longValue(), Optional.fromNullable(map2.get(key.getTopicName())));
                String topicName = key.getTopicName();
                if (map.containsKey(topicName)) {
                    map.get(topicName).add(createEmptyWorkUnit);
                } else {
                    map.put(topicName, Lists.newArrayList(new WorkUnit[]{createEmptyWorkUnit}));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<WorkUnit> getWorkUnitsForTopic(KafkaTopic kafkaTopic, SourceState sourceState, Optional<State> optional) {
        boolean isTopicQualified = isTopicQualified(kafkaTopic);
        ArrayList newArrayList = Lists.newArrayList();
        for (KafkaPartition kafkaPartition : kafkaTopic.getPartitions()) {
            WorkUnit workUnitForTopicPartition = getWorkUnitForTopicPartition(kafkaPartition, sourceState, optional);
            this.partitionsToBeProcessed.add(kafkaPartition);
            if (workUnitForTopicPartition != null) {
                if (!isTopicQualified) {
                    skipWorkUnit(workUnitForTopicPartition);
                }
                newArrayList.add(workUnitForTopicPartition);
            }
        }
        return newArrayList;
    }

    protected boolean isTopicQualified(KafkaTopic kafkaTopic) {
        return true;
    }

    private static void skipWorkUnit(WorkUnit workUnit) {
        workUnit.setProp("workunit.high.water.mark", Long.valueOf(workUnit.getLowWaterMark()));
    }

    private WorkUnit getWorkUnitForTopicPartition(KafkaPartition kafkaPartition, SourceState sourceState, Optional<State> optional) {
        Offsets offsets = new Offsets();
        boolean z = false;
        try {
            offsets.setEarliestOffset(this.kafkaConsumerClient.getEarliestOffset(kafkaPartition));
            offsets.setLatestOffset(this.kafkaConsumerClient.getLatestOffset(kafkaPartition));
        } catch (KafkaOffsetRetrievalFailureException e) {
            z = true;
        }
        long j = 0;
        boolean z2 = false;
        try {
            j = getPreviousOffsetForPartition(kafkaPartition, sourceState);
        } catch (PreviousOffsetNotFoundException e2) {
            z2 = true;
        }
        if (z) {
            this.failToGetOffsetCount.incrementAndGet();
            LOG.warn(String.format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.", kafkaPartition));
            if (z2) {
                return null;
            }
            return createEmptyWorkUnit(kafkaPartition, j, optional);
        }
        if (shouldMoveToLatestOffset(kafkaPartition, sourceState)) {
            offsets.startAtLatestOffset();
        } else if (z2) {
            String format = String.format("Previous offset for partition %s does not exist. ", kafkaPartition);
            String lowerCase = sourceState.getProp(BOOTSTRAP_WITH_OFFSET, "latest").toLowerCase();
            if (lowerCase.equals("latest")) {
                LOG.warn(format + "This partition will start from the latest offset: " + offsets.getLatestOffset());
                offsets.startAtLatestOffset();
            } else {
                if (!lowerCase.equals(EARLIEST_OFFSET)) {
                    LOG.warn(format + "This partition will be skipped.");
                    return null;
                }
                LOG.warn(format + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
                offsets.startAtEarliestOffset();
            }
        } else {
            try {
                offsets.startAt(j);
            } catch (StartOffsetOutOfRangeException e3) {
                if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
                    this.offsetTooEarlyCount.incrementAndGet();
                } else {
                    this.offsetTooLateCount.incrementAndGet();
                }
                String format2 = String.format("Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.", kafkaPartition, Long.valueOf(offsets.getStartOffset()), Long.valueOf(offsets.getEarliestOffset()), Long.valueOf(offsets.getLatestOffset()));
                String lowerCase2 = sourceState.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, "nearest").toLowerCase();
                if (lowerCase2.equals("latest") || (lowerCase2.equals("nearest") && offsets.getStartOffset() >= offsets.getLatestOffset())) {
                    LOG.warn(format2 + "This partition will start from the latest offset: " + offsets.getLatestOffset());
                    offsets.startAtLatestOffset();
                } else {
                    if (!lowerCase2.equals(EARLIEST_OFFSET) && !lowerCase2.equals("nearest")) {
                        LOG.warn(format2 + "This partition will be skipped.");
                        return createEmptyWorkUnit(kafkaPartition, j, optional);
                    }
                    LOG.warn(format2 + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
                    offsets.startAtEarliestOffset();
                }
            }
        }
        return getWorkUnitForTopicPartition(kafkaPartition, offsets, optional);
    }

    private long getPreviousOffsetForPartition(KafkaPartition kafkaPartition, SourceState sourceState) throws PreviousOffsetNotFoundException {
        getAllPreviousOffsets(sourceState);
        if (this.previousOffsets.containsKey(kafkaPartition)) {
            return this.previousOffsets.get(kafkaPartition).longValue();
        }
        throw new PreviousOffsetNotFoundException(String.format("Previous offset for topic %s, partition %s not found.", kafkaPartition.getTopicName(), Integer.valueOf(kafkaPartition.getId())));
    }

    private synchronized void getAllPreviousOffsets(SourceState sourceState) {
        if (this.doneGettingAllPreviousOffsets) {
            return;
        }
        this.previousOffsets.clear();
        for (WorkUnitState workUnitState : sourceState.getPreviousWorkUnitStates()) {
            List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
            MultiLongWatermark multiLongWatermark = (MultiLongWatermark) workUnitState.getActualHighWatermark(MultiLongWatermark.class);
            Preconditions.checkArgument(partitions.size() == multiLongWatermark.size(), String.format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions, multiLongWatermark));
            for (int i = 0; i < partitions.size(); i++) {
                if (multiLongWatermark.get(i) != -1) {
                    this.previousOffsets.put(partitions.get(i), Long.valueOf(multiLongWatermark.get(i)));
                }
            }
        }
        this.doneGettingAllPreviousOffsets = true;
    }

    private synchronized boolean shouldMoveToLatestOffset(KafkaPartition kafkaPartition, SourceState sourceState) {
        if (!sourceState.contains(TOPICS_MOVE_TO_LATEST_OFFSET)) {
            return false;
        }
        if (this.moveToLatestTopics.isEmpty()) {
            this.moveToLatestTopics.addAll(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(sourceState.getProp(TOPICS_MOVE_TO_LATEST_OFFSET)));
        }
        return this.moveToLatestTopics.contains(kafkaPartition.getTopicName()) || this.moveToLatestTopics.contains(ALL_TOPICS);
    }

    private WorkUnit createEmptyWorkUnit(KafkaPartition kafkaPartition, long j, Optional<State> optional) {
        Offsets offsets = new Offsets();
        offsets.setEarliestOffset(j);
        offsets.setLatestOffset(j);
        offsets.startAtEarliestOffset();
        return getWorkUnitForTopicPartition(kafkaPartition, offsets, optional);
    }

    private WorkUnit getWorkUnitForTopicPartition(KafkaPartition kafkaPartition, Offsets offsets, Optional<State> optional) {
        Extract createExtract = createExtract(this.tableType, this.extractNameSpace, kafkaPartition.getTopicName());
        if (this.isFullExtract) {
            createExtract.setProp("extract.is.full", true);
        }
        WorkUnit create = WorkUnit.create(createExtract);
        if (optional.isPresent()) {
            create.addAll((State) optional.get());
        }
        create.setProp(TOPIC_NAME, kafkaPartition.getTopicName());
        create.setProp("extract.table.name", kafkaPartition.getTopicName());
        create.setProp(PARTITION_ID, Integer.valueOf(kafkaPartition.getId()));
        create.setProp(LEADER_ID, Integer.valueOf(kafkaPartition.getLeader().getId()));
        create.setProp(LEADER_HOSTANDPORT, kafkaPartition.getLeader().getHostAndPort().toString());
        create.setProp("workunit.low.water.mark", Long.valueOf(offsets.getStartOffset()));
        create.setProp("workunit.high.water.mark", Long.valueOf(offsets.getLatestOffset()));
        LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", kafkaPartition, Long.valueOf(offsets.getStartOffset()), Long.valueOf(offsets.getLatestOffset()), Long.valueOf(offsets.getLatestOffset() - offsets.getStartOffset())));
        return create;
    }

    private List<KafkaTopic> getFilteredTopics(SourceState sourceState) {
        return this.kafkaConsumerClient.getFilteredTopics(DatasetFilterUtils.getPatternList(sourceState, TOPIC_BLACKLIST), DatasetFilterUtils.getPatternList(sourceState, TOPIC_WHITELIST));
    }

    public void shutdown(SourceState sourceState) {
        sourceState.setProp("offset.too.early.count", this.offsetTooEarlyCount);
        sourceState.setProp("offset.too.late.count", this.offsetTooLateCount);
        sourceState.setProp("fail.to.get.offset.count", this.failToGetOffsetCount);
    }
}
