package gobblin.source.extractor.extract.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.metrics.Tag;
import gobblin.source.extractor.DataRecordException;
import gobblin.source.extractor.extract.EventBasedExtractor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaExtractor.class */
public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
    protected static final int INITIAL_PARTITION_IDX = -1;
    protected final WorkUnitState workUnitState;
    protected final String topicName;
    protected final List<KafkaPartition> partitions;
    protected final MultiLongWatermark lowWatermark;
    protected final MultiLongWatermark highWatermark;
    protected final MultiLongWatermark nextWatermark;
    protected final KafkaWrapper kafkaWrapper;
    protected final Stopwatch stopwatch;
    protected final Map<KafkaPartition, Integer> decodingErrorCount;
    private final Map<KafkaPartition, Double> avgMillisPerRecord;
    private final Map<KafkaPartition, Long> avgRecordSizes;
    private final Set<Integer> errorPartitions;
    private int undecodableMessageCount;
    private Iterator<MessageAndOffset> messageIterator;
    private int currentPartitionIdx;
    private long currentPartitionRecordCount;
    private long currentPartitionTotalSize;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class);
    protected static final Integer MAX_LOG_DECODING_ERRORS = 5;

    public KafkaExtractor(WorkUnitState workUnitState) {
        super(workUnitState);
        this.undecodableMessageCount = 0;
        this.messageIterator = null;
        this.currentPartitionIdx = INITIAL_PARTITION_IDX;
        this.currentPartitionRecordCount = 0L;
        this.currentPartitionTotalSize = 0L;
        this.workUnitState = workUnitState;
        this.topicName = KafkaUtils.getTopicName(workUnitState);
        this.partitions = KafkaUtils.getPartitions(workUnitState);
        this.lowWatermark = workUnitState.getWorkunit().getLowWatermark(MultiLongWatermark.class);
        this.highWatermark = workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class);
        this.nextWatermark = new MultiLongWatermark(this.lowWatermark);
        this.kafkaWrapper = (KafkaWrapper) this.closer.register(KafkaWrapper.create(workUnitState));
        this.stopwatch = Stopwatch.createUnstarted();
        this.decodingErrorCount = Maps.newHashMap();
        this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
        this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
        this.errorPartitions = Sets.newHashSet();
        this.workUnitState.setActualHighWatermark(this.lowWatermark);
    }

    public List<Tag<?>> generateTags(State state) {
        List<Tag<?>> generateTags = super.generateTags(state);
        generateTags.add(new Tag<>("kafkaTopic", KafkaUtils.getTopicName(state)));
        return generateTags;
    }

    public D readRecordImpl(D d) throws DataRecordException, IOException {
        while (!allPartitionsFinished()) {
            if (currentPartitionFinished()) {
                moveToNextPartition();
            } else {
                if (this.messageIterator == null || !this.messageIterator.hasNext()) {
                    try {
                        this.messageIterator = fetchNextMessageBuffer();
                        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
                            moveToNextPartition();
                        }
                    } catch (Exception e) {
                        LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", getCurrentPartition()), e);
                        moveToNextPartition();
                    }
                }
                while (!currentPartitionFinished() && this.messageIterator.hasNext()) {
                    MessageAndOffset next = this.messageIterator.next();
                    if (next.offset() >= this.nextWatermark.get(this.currentPartitionIdx)) {
                        this.nextWatermark.set(this.currentPartitionIdx, next.nextOffset());
                        try {
                            D decodeRecord = decodeRecord(next);
                            this.currentPartitionRecordCount++;
                            this.currentPartitionTotalSize += next.message().payloadSize();
                            return decodeRecord;
                        } catch (Throwable th) {
                            this.errorPartitions.add(Integer.valueOf(this.currentPartitionIdx));
                            this.undecodableMessageCount++;
                            if (shouldLogError()) {
                                LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), th);
                                incrementErrorCount();
                            }
                        }
                    }
                }
            }
        }
        LOG.info("Finished pulling topic " + this.topicName);
        return null;
    }

    private boolean allPartitionsFinished() {
        return this.currentPartitionIdx != INITIAL_PARTITION_IDX && this.currentPartitionIdx >= this.highWatermark.size();
    }

    private boolean currentPartitionFinished() {
        if (this.currentPartitionIdx == INITIAL_PARTITION_IDX) {
            return true;
        }
        if (this.nextWatermark.get(this.currentPartitionIdx) < this.highWatermark.get(this.currentPartitionIdx)) {
            return false;
        }
        LOG.info("Finished pulling partition " + getCurrentPartition());
        return true;
    }

    private void moveToNextPartition() {
        if (this.currentPartitionIdx == INITIAL_PARTITION_IDX) {
            LOG.info("Pulling topic " + this.topicName);
            this.currentPartitionIdx = 0;
        } else {
            this.stopwatch.stop();
            if (this.currentPartitionRecordCount != 0) {
                this.avgMillisPerRecord.put(getCurrentPartition(), Double.valueOf(this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / this.currentPartitionRecordCount));
                this.avgRecordSizes.put(getCurrentPartition(), Long.valueOf(this.currentPartitionTotalSize / this.currentPartitionRecordCount));
            }
            this.currentPartitionIdx++;
            this.currentPartitionRecordCount = 0L;
            this.currentPartitionTotalSize = 0L;
            this.stopwatch.reset();
        }
        this.messageIterator = null;
        if (this.currentPartitionIdx < this.partitions.size()) {
            LOG.info(String.format("Pulling partition %s from offset %d to %d, range=%d", getCurrentPartition(), Long.valueOf(this.nextWatermark.get(this.currentPartitionIdx)), Long.valueOf(this.highWatermark.get(this.currentPartitionIdx)), Long.valueOf(this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx))));
            switchMetricContextToCurrentPartition();
        }
        this.stopwatch.start();
    }

    private void switchMetricContextToCurrentPartition() {
        if (this.currentPartitionIdx >= this.partitions.size()) {
            return;
        }
        switchMetricContext(Lists.newArrayList(new Tag[]{new Tag("kafka_partition", Integer.valueOf(getCurrentPartition().getId()))}));
    }

    private Iterator<MessageAndOffset> fetchNextMessageBuffer() {
        return this.kafkaWrapper.fetchNextMessageBuffer(this.partitions.get(this.currentPartitionIdx), this.nextWatermark.get(this.currentPartitionIdx), this.highWatermark.get(this.currentPartitionIdx));
    }

    private boolean shouldLogError() {
        return !this.decodingErrorCount.containsKey(getCurrentPartition()) || this.decodingErrorCount.get(getCurrentPartition()).intValue() <= MAX_LOG_DECODING_ERRORS.intValue();
    }

    private void incrementErrorCount() {
        if (this.decodingErrorCount.containsKey(getCurrentPartition())) {
            this.decodingErrorCount.put(getCurrentPartition(), Integer.valueOf(this.decodingErrorCount.get(getCurrentPartition()).intValue() + 1));
        } else {
            this.decodingErrorCount.put(getCurrentPartition(), 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaPartition getCurrentPartition() {
        Preconditions.checkElementIndex(this.currentPartitionIdx, this.partitions.size(), "KafkaExtractor has finished extracting all partitions. There's no current partition.");
        return this.partitions.get(this.currentPartitionIdx);
    }

    protected abstract D decodeRecord(MessageAndOffset messageAndOffset) throws IOException;

    public long getExpectedRecordCount() {
        return this.lowWatermark.getGap(this.highWatermark);
    }

    public void close() throws IOException {
        this.workUnitState.setProp("error.partition.count", Integer.valueOf(this.errorPartitions.size()));
        this.workUnitState.setProp("error.message.undecodable.count", Integer.valueOf(this.undecodableMessageCount));
        for (int i = 0; i < this.partitions.size(); i++) {
            LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i), Long.valueOf(this.nextWatermark.get(i)), Long.valueOf(this.highWatermark.get(i))));
        }
        this.workUnitState.setActualHighWatermark(this.nextWatermark);
        for (KafkaPartition kafkaPartition : this.partitions) {
            if (this.avgMillisPerRecord.containsKey(kafkaPartition)) {
                double doubleValue = this.avgMillisPerRecord.get(kafkaPartition).doubleValue();
                LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", kafkaPartition, Double.valueOf(doubleValue)));
                KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, kafkaPartition, doubleValue);
            } else {
                LOG.info(String.format("Avg time to pull a record for partition %s not recorded", kafkaPartition));
            }
        }
        this.closer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static byte[] getBytes(ByteBuffer byteBuffer) {
        byte[] bArr = null;
        if (byteBuffer != null) {
            int remaining = byteBuffer.remaining();
            bArr = new byte[remaining];
            byteBuffer.get(bArr, byteBuffer.position(), remaining);
        }
        return bArr;
    }

    @Deprecated
    public long getHighWatermark() {
        return 0L;
    }
}
