package gobblin.source.extractor.extract.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import gobblin.configuration.SourceState;
import gobblin.configuration.State;
import gobblin.configuration.StateUtils;
import gobblin.configuration.WorkUnitState;
import gobblin.source.extractor.extract.EventBasedSource;
import gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.DatasetFilterUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource.class */
public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
    public static final String TOPIC_BLACKLIST = "topic.blacklist";
    public static final String TOPIC_WHITELIST = "topic.whitelist";
    public static final String LATEST_OFFSET = "latest";
    public static final String EARLIEST_OFFSET = "earliest";
    public static final String NEAREST_OFFSET = "nearest";
    public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
    public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = "latest";
    public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
    public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
    public static final String DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE = "nearest";
    public static final String TOPIC_NAME = "topic.name";
    public static final String PARTITION_ID = "partition.id";
    public static final String LEADER_ID = "leader.id";
    public static final String LEADER_HOSTANDPORT = "leader.hostandport";
    public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
    public static final String ALL_TOPICS = "all";
    public static final String AVG_RECORD_SIZE = "avg.record.size";
    public static final String AVG_RECORD_MILLIS = "avg.record.millis";

    @VisibleForTesting
    static final String KAFKA_TOPIC_SPECIFIC_STATE = "kafka.topic.specific.state";
    private KafkaWrapper kafkaWrapper;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private static final Gson GSON = new Gson();
    public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;
    private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
    private final Map<KafkaPartition, Long> previousOffsets = Maps.newHashMap();
    private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newHashSet();
    private Closer closer = Closer.create();
    private int failToGetOffsetCount = 0;
    private int offsetTooEarlyCount = 0;
    private int offsetTooLateCount = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource$KafkaTopicNamePredicate.class */
    public static class KafkaTopicNamePredicate implements Predicate<KafkaTopic> {
        private final Pattern topicNamePattern;

        public KafkaTopicNamePredicate(String str) {
            this.topicNamePattern = Pattern.compile(str);
        }

        public boolean apply(KafkaTopic kafkaTopic) {
            return this.topicNamePattern.matcher(kafkaTopic.getName()).matches();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaSource$Offsets.class */
    public static class Offsets {
        private long startOffset;
        private long earliestOffset;
        private long latestOffset;

        private Offsets() {
            this.startOffset = 0L;
            this.earliestOffset = 0L;
            this.latestOffset = 0L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAt(long j) throws StartOffsetOutOfRangeException {
            if (j < this.earliestOffset || j > this.latestOffset + 1) {
                throw new StartOffsetOutOfRangeException(String.format("start offset = %d, earliest offset = %d, latest offset = %d", Long.valueOf(j), Long.valueOf(this.earliestOffset), Long.valueOf(this.latestOffset)));
            }
            this.startOffset = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAtEarliestOffset() {
            this.startOffset = this.earliestOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startAtLatestOffset() {
            this.startOffset = this.latestOffset;
        }

        public long getStartOffset() {
            return this.startOffset;
        }

        public long getEarliestOffset() {
            return this.earliestOffset;
        }

        public void setEarliestOffset(long j) {
            this.earliestOffset = j;
        }

        public long getLatestOffset() {
            return this.latestOffset;
        }

        public void setLatestOffset(long j) {
            this.latestOffset = j;
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        HashMap newHashMap = Maps.newHashMap();
        this.kafkaWrapper = (KafkaWrapper) this.closer.register(KafkaWrapper.create(sourceState));
        List<KafkaTopic> filteredTopics = getFilteredTopics(sourceState);
        Map<String, State> topicSpecificState = getTopicSpecificState(filteredTopics, sourceState);
        for (KafkaTopic kafkaTopic : filteredTopics) {
            newHashMap.put(kafkaTopic.getName(), getWorkUnitsForTopic(kafkaTopic, sourceState, Optional.fromNullable(topicSpecificState.get(kafkaTopic.getName()))));
        }
        createEmptyWorkUnitsForSkippedPartitions(newHashMap, topicSpecificState);
        return KafkaWorkUnitPacker.getInstance(this, sourceState).pack(newHashMap, sourceState.getPropAsInt("mr.job.max.mappers", 100));
    }

    @VisibleForTesting
    Map<String, State> getTopicSpecificState(List<KafkaTopic> list, SourceState sourceState) {
        if (Strings.isNullOrEmpty(sourceState.getProp(KAFKA_TOPIC_SPECIFIC_STATE))) {
            return Maps.newHashMap();
        }
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = sourceState.getPropAsJsonArray(KAFKA_TOPIC_SPECIFIC_STATE).iterator();
        while (it.hasNext()) {
            JsonElement jsonElement = (JsonElement) it.next();
            Preconditions.checkArgument(jsonElement.isJsonObject(), "The value for property kafka.topic.specific.state is malformed");
            JsonObject asJsonObject = jsonElement.getAsJsonObject();
            if (asJsonObject.has(TOPIC_NAME)) {
                JsonElement jsonElement2 = asJsonObject.get(TOPIC_NAME);
                Preconditions.checkArgument(jsonElement2.isJsonPrimitive(), "The value for property kafka.topic.specific.state is malformed, the topic.name field must be a string");
                for (KafkaTopic kafkaTopic : Iterables.filter(list, new KafkaTopicNamePredicate(jsonElement2.getAsString()))) {
                    if (newHashMap.containsKey(kafkaTopic.getName())) {
                        ((State) newHashMap.get(kafkaTopic.getName())).addAll(StateUtils.jsonObjectToState(asJsonObject, new String[]{TOPIC_NAME}));
                    } else {
                        newHashMap.put(kafkaTopic.getName(), StateUtils.jsonObjectToState(asJsonObject, new String[]{TOPIC_NAME}));
                    }
                }
            } else {
                LOG.warn("Skipping JsonElement " + jsonElement + " as it is does not contain a field with key " + TOPIC_NAME);
            }
        }
        return newHashMap;
    }

    private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>> map, Map<String, State> map2) {
        for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
            KafkaPartition key = entry.getKey();
            if (!this.partitionsToBeProcessed.contains(key)) {
                WorkUnit createEmptyWorkUnit = createEmptyWorkUnit(key, entry.getValue().longValue(), Optional.fromNullable(map2.get(key.getTopicName())));
                String topicName = key.getTopicName();
                if (map.containsKey(topicName)) {
                    map.get(topicName).add(createEmptyWorkUnit);
                } else {
                    map.put(topicName, Lists.newArrayList(new WorkUnit[]{createEmptyWorkUnit}));
                }
            }
        }
    }

    private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic kafkaTopic, SourceState sourceState, Optional<State> optional) {
        boolean isTopicQualified = isTopicQualified(kafkaTopic);
        ArrayList newArrayList = Lists.newArrayList();
        for (KafkaPartition kafkaPartition : kafkaTopic.getPartitions()) {
            WorkUnit workUnitForTopicPartition = getWorkUnitForTopicPartition(kafkaPartition, sourceState, optional);
            this.partitionsToBeProcessed.add(kafkaPartition);
            if (workUnitForTopicPartition != null) {
                if (!isTopicQualified) {
                    skipWorkUnit(workUnitForTopicPartition);
                }
                newArrayList.add(workUnitForTopicPartition);
            }
        }
        return newArrayList;
    }

    protected boolean isTopicQualified(KafkaTopic kafkaTopic) {
        return true;
    }

    private void skipWorkUnit(WorkUnit workUnit) {
        workUnit.setProp("workunit.high.water.mark", Long.valueOf(workUnit.getLowWaterMark()));
    }

    private WorkUnit getWorkUnitForTopicPartition(KafkaPartition kafkaPartition, SourceState sourceState, Optional<State> optional) {
        Offsets offsets = new Offsets();
        boolean z = false;
        try {
            offsets.setEarliestOffset(this.kafkaWrapper.getEarliestOffset(kafkaPartition));
            offsets.setLatestOffset(this.kafkaWrapper.getLatestOffset(kafkaPartition));
        } catch (KafkaOffsetRetrievalFailureException e) {
            z = true;
        }
        long j = 0;
        boolean z2 = false;
        try {
            j = getPreviousOffsetForPartition(kafkaPartition, sourceState);
        } catch (PreviousOffsetNotFoundException e2) {
            z2 = true;
        }
        if (z) {
            this.failToGetOffsetCount++;
            LOG.warn(String.format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.", kafkaPartition));
            if (z2) {
                return null;
            }
            return createEmptyWorkUnit(kafkaPartition, j, optional);
        }
        if (shouldMoveToLatestOffset(kafkaPartition, sourceState)) {
            offsets.startAtLatestOffset();
        } else if (z2) {
            String format = String.format("Previous offset for partition %s does not exist. ", kafkaPartition);
            String lowerCase = sourceState.getProp(BOOTSTRAP_WITH_OFFSET, "latest").toLowerCase();
            if (lowerCase.equals("latest")) {
                LOG.warn(format + "This partition will start from the latest offset: " + offsets.getLatestOffset());
                offsets.startAtLatestOffset();
            } else {
                if (!lowerCase.equals(EARLIEST_OFFSET)) {
                    LOG.warn(format + "This partition will be skipped.");
                    return null;
                }
                LOG.warn(format + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
                offsets.startAtEarliestOffset();
            }
        } else {
            try {
                offsets.startAt(j);
            } catch (StartOffsetOutOfRangeException e3) {
                if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
                    this.offsetTooEarlyCount++;
                } else {
                    this.offsetTooLateCount++;
                }
                String format2 = String.format("Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.", kafkaPartition, Long.valueOf(offsets.getStartOffset()), Long.valueOf(offsets.getEarliestOffset()), Long.valueOf(offsets.getLatestOffset()));
                String lowerCase2 = sourceState.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, "nearest").toLowerCase();
                if (lowerCase2.equals("latest") || (lowerCase2.equals("nearest") && offsets.getStartOffset() >= offsets.getLatestOffset())) {
                    LOG.warn(format2 + "This partition will start from the latest offset: " + offsets.getLatestOffset());
                    offsets.startAtLatestOffset();
                } else {
                    if (!lowerCase2.equals(EARLIEST_OFFSET) && !lowerCase2.equals("nearest")) {
                        LOG.warn(format2 + "This partition will be skipped.");
                        return createEmptyWorkUnit(kafkaPartition, j, optional);
                    }
                    LOG.warn(format2 + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
                    offsets.startAtEarliestOffset();
                }
            }
        }
        return getWorkUnitForTopicPartition(kafkaPartition, offsets, optional);
    }

    private long getPreviousOffsetForPartition(KafkaPartition kafkaPartition, SourceState sourceState) throws PreviousOffsetNotFoundException {
        if (this.previousOffsets.isEmpty()) {
            getAllPreviousOffsets(sourceState);
        }
        if (this.previousOffsets.containsKey(kafkaPartition)) {
            return this.previousOffsets.get(kafkaPartition).longValue();
        }
        throw new PreviousOffsetNotFoundException(String.format("Previous offset for topic %s, partition %s not found.", kafkaPartition.getTopicName(), Integer.valueOf(kafkaPartition.getId())));
    }

    private void getAllPreviousOffsets(SourceState sourceState) {
        this.previousOffsets.clear();
        for (WorkUnitState workUnitState : sourceState.getPreviousWorkUnitStates()) {
            List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
            MultiLongWatermark watermark = getWatermark(workUnitState);
            Preconditions.checkArgument(partitions.size() == watermark.size(), String.format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions, watermark));
            for (int i = 0; i < partitions.size(); i++) {
                if (watermark.get(i) != -1) {
                    this.previousOffsets.put(partitions.get(i), Long.valueOf(watermark.get(i)));
                }
            }
        }
    }

    private MultiLongWatermark getWatermark(WorkUnitState workUnitState) {
        if (workUnitState.getActualHighWatermark() != null) {
            return (MultiLongWatermark) GSON.fromJson(workUnitState.getActualHighWatermark(), MultiLongWatermark.class);
        }
        if (workUnitState.getWorkunit().getLowWatermark() != null) {
            return (MultiLongWatermark) GSON.fromJson(workUnitState.getWorkunit().getLowWatermark(), MultiLongWatermark.class);
        }
        throw new IllegalArgumentException(String.format("workUnitState %s doesn't have either actual high watermark or low watermark", workUnitState));
    }

    private boolean shouldMoveToLatestOffset(KafkaPartition kafkaPartition, SourceState sourceState) {
        if (!sourceState.contains(TOPICS_MOVE_TO_LATEST_OFFSET)) {
            return false;
        }
        if (this.moveToLatestTopics.isEmpty()) {
            this.moveToLatestTopics.addAll(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(sourceState.getProp(TOPICS_MOVE_TO_LATEST_OFFSET)));
        }
        return this.moveToLatestTopics.contains(kafkaPartition.getTopicName()) || this.moveToLatestTopics.contains(ALL_TOPICS);
    }

    private WorkUnit createEmptyWorkUnit(KafkaPartition kafkaPartition, long j, Optional<State> optional) {
        Offsets offsets = new Offsets();
        offsets.setEarliestOffset(j);
        offsets.setLatestOffset(j);
        offsets.startAtEarliestOffset();
        return getWorkUnitForTopicPartition(kafkaPartition, offsets, optional);
    }

    private WorkUnit getWorkUnitForTopicPartition(KafkaPartition kafkaPartition, Offsets offsets, Optional<State> optional) {
        WorkUnit create = WorkUnit.create(createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, kafkaPartition.getTopicName()));
        if (optional.isPresent()) {
            create.addAll((State) optional.get());
        }
        create.setProp(TOPIC_NAME, kafkaPartition.getTopicName());
        create.setProp("extract.table.name", kafkaPartition.getTopicName());
        create.setProp(PARTITION_ID, Integer.valueOf(kafkaPartition.getId()));
        create.setProp(LEADER_ID, Integer.valueOf(kafkaPartition.getLeader().getId()));
        create.setProp(LEADER_HOSTANDPORT, kafkaPartition.getLeader().getHostAndPort().toString());
        create.setProp("workunit.low.water.mark", Long.valueOf(offsets.getStartOffset()));
        create.setProp("workunit.high.water.mark", Long.valueOf(offsets.getLatestOffset()));
        LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", kafkaPartition, Long.valueOf(offsets.getStartOffset()), Long.valueOf(offsets.getLatestOffset()), Long.valueOf(offsets.getLatestOffset() - offsets.getStartOffset())));
        return create;
    }

    private List<KafkaTopic> getFilteredTopics(SourceState sourceState) {
        return this.kafkaWrapper.getFilteredTopics(getBlacklist(sourceState), getWhitelist(sourceState));
    }

    private static List<Pattern> getBlacklist(State state) {
        return DatasetFilterUtils.getPatternsFromStrings(state.getPropAsList(TOPIC_BLACKLIST, ""));
    }

    private static List<Pattern> getWhitelist(State state) {
        return DatasetFilterUtils.getPatternsFromStrings(state.getPropAsList(TOPIC_WHITELIST, ""));
    }

    @Override // gobblin.source.extractor.extract.EventBasedSource
    public void shutdown(SourceState sourceState) {
        sourceState.setProp("offset.too.early.count", Integer.valueOf(this.offsetTooEarlyCount));
        sourceState.setProp("offset.too.late.count", Integer.valueOf(this.offsetTooLateCount));
        sourceState.setProp("fail.to.get.offset.count", Integer.valueOf(this.failToGetOffsetCount));
        try {
            this.closer.close();
        } catch (IOException e) {
            LOG.error("Failed to close kafkaWrapper", e);
        }
    }
}
