package org.apache.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.kafka.KafkaSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.class */
public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartition, Long, KafkaRecordEntity> {
    public static final TypeReference<TreeMap<Integer, Map<KafkaTopicPartition, Long>>> CHECKPOINTS_TYPE_REF = new TypeReference<TreeMap<Integer, Map<KafkaTopicPartition, Long>>>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.1
    };
    private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
    private static final Long NOT_SET = -1L;
    private static final Long END_OF_PARTITION = Long.MAX_VALUE;
    private final ServiceEmitter emitter;
    private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private final Pattern pattern;
    private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
    private final KafkaSupervisorSpec spec;

    public KafkaSupervisor(TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory, ObjectMapper objectMapper, KafkaSupervisorSpec kafkaSupervisorSpec, RowIngestionMetersFactory rowIngestionMetersFactory) {
        super(StringUtils.format("KafkaSupervisor-%s", new Object[]{kafkaSupervisorSpec.getDataSchema().getDataSource()}), taskStorage, taskMaster, indexerMetadataStorageCoordinator, kafkaIndexTaskClientFactory, objectMapper, kafkaSupervisorSpec, rowIngestionMetersFactory, false);
        this.spec = kafkaSupervisorSpec;
        this.emitter = kafkaSupervisorSpec.getEmitter();
        this.monitorSchedulerConfig = kafkaSupervisorSpec.getMonitorSchedulerConfig();
        this.pattern = m20getIoConfig().isMultiTopic() ? Pattern.compile(m20getIoConfig().getStream()) : null;
    }

    protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> setupRecordSupplier() {
        return new KafkaRecordSupplier(this.spec.m24getIoConfig().getConsumerProperties(), this.sortingMapper, this.spec.m24getIoConfig().getConfigOverrides(), this.spec.m24getIoConfig().isMultiTopic());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getTaskGroupIdForPartition(KafkaTopicPartition kafkaTopicPartition) {
        Integer taskCount = this.spec.m24getIoConfig().getTaskCount();
        return kafkaTopicPartition.isMultiTopicPartition() ? Math.abs((31 * kafkaTopicPartition.topic().hashCode()) + kafkaTopicPartition.partition()) % taskCount.intValue() : kafkaTopicPartition.partition() % taskCount.intValue();
    }

    protected boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata) {
        return dataSourceMetadata instanceof KafkaDataSourceMetadata;
    }

    protected boolean doesTaskTypeMatchSupervisor(Task task) {
        return task instanceof KafkaIndexTask;
    }

    protected SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> createReportPayload(int i, boolean z) {
        KafkaSupervisorIOConfig m24getIoConfig = this.spec.m24getIoConfig();
        Map<KafkaTopicPartition, Long> recordLagPerPartitionInLatestSequences = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
        return new KafkaSupervisorReportPayload(this.spec.getDataSchema().getDataSource(), m24getIoConfig.getStream(), i, m24getIoConfig.getReplicas().intValue(), m24getIoConfig.getTaskDuration().getMillis() / 1000, z ? this.latestSequenceFromStream : null, z ? recordLagPerPartitionInLatestSequences : null, z ? Long.valueOf(recordLagPerPartitionInLatestSequences.values().stream().mapToLong(l -> {
            return Math.max(l.longValue(), 0L);
        }).sum()) : null, z ? this.sequenceLastUpdated : null, this.spec.isSuspended(), this.stateManager.isHealthy(), this.stateManager.getSupervisorState().getBasicState(), this.stateManager.getSupervisorState(), this.stateManager.getExceptionEvents());
    }

    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<KafkaTopicPartition, Long> map, Map<KafkaTopicPartition, Long> map2, String str, DateTime dateTime, DateTime dateTime2, Set<KafkaTopicPartition> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
        KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) seekableStreamSupervisorIOConfig;
        return new KafkaIndexTaskIOConfig(Integer.valueOf(i), str, null, null, new SeekableStreamStartSequenceNumbers(kafkaSupervisorIOConfig.getStream(), map, Collections.emptySet()), new SeekableStreamEndSequenceNumbers(kafkaSupervisorIOConfig.getStream(), map2), kafkaSupervisorIOConfig.getConsumerProperties(), Long.valueOf(kafkaSupervisorIOConfig.getPollTimeout()), true, dateTime, dateTime2, seekableStreamSupervisorIOConfig.getInputFormat(), kafkaSupervisorIOConfig.getConfigOverrides(), Boolean.valueOf(kafkaSupervisorIOConfig.isMultiTopic()));
    }

    protected List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<KafkaTopicPartition, Long>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) throws JsonProcessingException {
        String writeValueAsString = objectMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(treeMap);
        Map createBaseTaskContexts = createBaseTaskContexts();
        createBaseTaskContexts.put("checkpoints", writeValueAsString);
        createBaseTaskContexts.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new KafkaIndexTask(IdUtils.getRandomIdWithPrefix(str), new TaskResource(str, 1), this.spec.getDataSchema(), (KafkaIndexTaskTuningConfig) seekableStreamIndexTaskTuningConfig, (KafkaIndexTaskIOConfig) seekableStreamIndexTaskIOConfig, createBaseTaskContexts, objectMapper));
        }
        return arrayList;
    }

    protected Map<KafkaTopicPartition, Long> getPartitionRecordLag() {
        Map highestCurrentOffsets = getHighestCurrentOffsets();
        if (this.latestSequenceFromStream == null) {
            return null;
        }
        if (!this.latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
            log.warn("Lag metric: Kafka partitions %s do not match task partitions %s", new Object[]{this.latestSequenceFromStream.keySet(), highestCurrentOffsets.keySet()});
        }
        return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
    }

    @Nullable
    protected Map<KafkaTopicPartition, Long> getPartitionTimeLag() {
        return null;
    }

    private Map<KafkaTopicPartition, Long> getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> map) {
        return this.latestSequenceFromStream == null ? Collections.emptyMap() : (Map) this.latestSequenceFromStream.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(entry.getValue() != null ? ((Long) entry.getValue()).longValue() - ((Long) Optional.ofNullable((Long) map.get(entry.getKey())).orElse(0L)).longValue() : 0L);
        }));
    }

    protected Map<KafkaTopicPartition, Long> getRecordLagPerPartition(Map<KafkaTopicPartition, Long> map) {
        return (this.latestSequenceFromStream == null || map == null) ? Collections.emptyMap() : (Map) map.entrySet().stream().filter(entry -> {
            return this.latestSequenceFromStream.get(entry.getKey()) != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return Long.valueOf(entry2.getValue() != null ? this.latestSequenceFromStream.get(entry2.getKey()).longValue() - ((Long) entry2.getValue()).longValue() : 0L);
        }));
    }

    protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPartition, Long> map) {
        return null;
    }

    protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String str, Map<KafkaTopicPartition, Long> map) {
        return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers(str, map));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OrderedSequenceNumber<Long> makeSequenceNumber(Long l, boolean z) {
        return KafkaSequenceNumber.of(l);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getNotSetMarker, reason: merged with bridge method [inline-methods] */
    public Long m18getNotSetMarker() {
        return NOT_SET;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getEndOfPartitionMarker, reason: merged with bridge method [inline-methods] */
    public Long m17getEndOfPartitionMarker() {
        return END_OF_PARTITION;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isEndOfShard(Long l) {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isShardExpirationMarker(Long l) {
        return false;
    }

    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
        return false;
    }

    public LagStats computeLagStats() {
        Map<KafkaTopicPartition, Long> partitionRecordLag = getPartitionRecordLag();
        return partitionRecordLag == null ? new LagStats(0L, 0L, 0L) : computeLags(partitionRecordLag);
    }

    protected void updatePartitionLagFromStream() {
        getRecordSupplierLock().lock();
        try {
            try {
                try {
                    Set set = (Set) this.recordSupplier.getPartitionIds(m20getIoConfig().getStream()).stream().map(kafkaTopicPartition -> {
                        return new StreamPartition(m20getIoConfig().getStream(), kafkaTopicPartition);
                    }).collect(Collectors.toSet());
                    this.recordSupplier.seekToLatest(set);
                    Stream stream = set.stream();
                    Function function = (v0) -> {
                        return v0.getPartitionId();
                    };
                    RecordSupplier recordSupplier = this.recordSupplier;
                    Objects.requireNonNull(recordSupplier);
                    this.latestSequenceFromStream = (Map) stream.collect(Collectors.toMap(function, recordSupplier::getPosition));
                    getRecordSupplierLock().unlock();
                } catch (InterruptedException e) {
                    throw new StreamException(e);
                }
            } catch (Exception e2) {
                log.warn("Could not fetch partitions for topic/stream [%s]", new Object[]{m20getIoConfig().getStream()});
                throw new StreamException(e2);
            }
        } catch (Throwable th) {
            getRecordSupplierLock().unlock();
            throw th;
        }
    }

    protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream() {
        return this.latestSequenceFromStream != null ? this.latestSequenceFromStream : new HashMap();
    }

    protected String baseTaskName() {
        return "index_kafka";
    }

    @VisibleForTesting
    /* renamed from: getIoConfig, reason: merged with bridge method [inline-methods] */
    public KafkaSupervisorIOConfig m20getIoConfig() {
        return this.spec.m24getIoConfig();
    }

    @VisibleForTesting
    public KafkaSupervisorTuningConfig getTuningConfig() {
        return this.spec.m25getTuningConfig();
    }

    protected boolean isMultiTopic() {
        return m20getIoConfig().isMultiTopic() && this.pattern != null;
    }

    protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() {
        SeekableStreamSequenceNumbers seekableStreamSequenceNumbers;
        KafkaDataSourceMetadata retrieveDataSourceMetadata = retrieveDataSourceMetadata();
        if (!checkSourceMetadataMatch(retrieveDataSourceMetadata) || (seekableStreamSequenceNumbers = retrieveDataSourceMetadata.getSeekableStreamSequenceNumbers()) == null || seekableStreamSequenceNumbers.getPartitionSequenceNumberMap() == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        seekableStreamSequenceNumbers.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, l) -> {
            String stream = kafkaTopicPartition.topic().isPresent() ? kafkaTopicPartition.topic().get() : seekableStreamSequenceNumbers.getStream();
            KafkaTopicPartition matchingKafkaTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, stream);
            if (matchingKafkaTopicPartition == null && !hashSet.contains(stream)) {
                log.warn("Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences", new Object[]{stream, m20getIoConfig().getStream()});
                hashSet.add(stream);
            }
            if (matchingKafkaTopicPartition != null) {
                hashMap.put(matchingKafkaTopicPartition, l);
            }
        });
        return hashMap;
    }

    @Nullable
    private KafkaTopicPartition getMatchingKafkaTopicPartition(KafkaTopicPartition kafkaTopicPartition, String str) {
        if (this.pattern != null ? this.pattern.matcher(str).matches() : m20getIoConfig().getStream().equals(str)) {
            return new KafkaTopicPartition(isMultiTopic(), str, kafkaTopicPartition.partition());
        }
        return null;
    }

    /* renamed from: createDataSourceMetaDataForReset, reason: collision with other method in class */
    protected /* bridge */ /* synthetic */ SeekableStreamDataSourceMetadata m19createDataSourceMetaDataForReset(String str, Map map) {
        return createDataSourceMetaDataForReset(str, (Map<KafkaTopicPartition, Long>) map);
    }
}
