package org.apache.druid.indexing.kinesis.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.class */
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String> {
    private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class);
    public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF = new TypeReference<TreeMap<Integer, Map<String, String>>>() { // from class: org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor.1
    };
    public static final String OFFSET_NOT_SET = "-1";
    private final KinesisSupervisorSpec spec;
    private final AWSCredentialsConfig awsCredentialsConfig;
    private volatile Map<String, Long> currentPartitionTimeLag;

    public KinesisSupervisor(TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KinesisIndexTaskClientFactory kinesisIndexTaskClientFactory, ObjectMapper objectMapper, KinesisSupervisorSpec kinesisSupervisorSpec, RowIngestionMetersFactory rowIngestionMetersFactory, AWSCredentialsConfig aWSCredentialsConfig) {
        super(StringUtils.format("KinesisSupervisor-%s", new Object[]{kinesisSupervisorSpec.getDataSchema().getDataSource()}), taskStorage, taskMaster, indexerMetadataStorageCoordinator, kinesisIndexTaskClientFactory, objectMapper, kinesisSupervisorSpec, rowIngestionMetersFactory, true);
        this.spec = kinesisSupervisorSpec;
        this.awsCredentialsConfig = aWSCredentialsConfig;
        this.currentPartitionTimeLag = null;
    }

    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<String, String> map, Map<String, String> map2, String str, DateTime dateTime, DateTime dateTime2, Set<String> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
        KinesisSupervisorIOConfig kinesisSupervisorIOConfig = (KinesisSupervisorIOConfig) seekableStreamSupervisorIOConfig;
        return new KinesisIndexTaskIOConfig(i, str, new SeekableStreamStartSequenceNumbers(kinesisSupervisorIOConfig.getStream(), map, set), new SeekableStreamEndSequenceNumbers(kinesisSupervisorIOConfig.getStream(), map2), true, dateTime, dateTime2, kinesisSupervisorIOConfig.getInputFormat(), kinesisSupervisorIOConfig.getEndpoint(), kinesisSupervisorIOConfig.getRecordsPerFetch(), kinesisSupervisorIOConfig.getFetchDelayMillis(), kinesisSupervisorIOConfig.getAwsAssumedRoleArn(), kinesisSupervisorIOConfig.getAwsExternalId(), kinesisSupervisorIOConfig.isDeaggregate());
    }

    protected List<SeekableStreamIndexTask<String, String>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<String, String>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) throws JsonProcessingException {
        String writeValueAsString = objectMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(treeMap);
        Map createBaseTaskContexts = createBaseTaskContexts();
        createBaseTaskContexts.put("checkpoints", writeValueAsString);
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new KinesisIndexTask(TaskIdUtils.getRandomIdWithPrefix(str), new TaskResource(str, 1), this.spec.getDataSchema(), (KinesisIndexTaskTuningConfig) seekableStreamIndexTaskTuningConfig, (KinesisIndexTaskIOConfig) seekableStreamIndexTaskIOConfig, createBaseTaskContexts, null, null, rowIngestionMetersFactory, this.awsCredentialsConfig, null));
        }
        return arrayList;
    }

    protected RecordSupplier<String, String> setupRecordSupplier() throws RuntimeException {
        KinesisSupervisorIOConfig m20getIoConfig = this.spec.m20getIoConfig();
        KinesisSupervisorTuningConfig m21getTuningConfig = this.spec.m21getTuningConfig();
        return new KinesisRecordSupplier(KinesisRecordSupplier.getAmazonKinesisClient(m20getIoConfig.getEndpoint(), this.awsCredentialsConfig, m20getIoConfig.getAwsAssumedRoleArn(), m20getIoConfig.getAwsExternalId()), m20getIoConfig.getRecordsPerFetch().intValue(), m20getIoConfig.getFetchDelayMillis().intValue(), 0, m20getIoConfig.isDeaggregate(), m21getTuningConfig.getRecordBufferSize(), m21getTuningConfig.getRecordBufferOfferTimeout(), m21getTuningConfig.getRecordBufferFullWait(), m21getTuningConfig.getFetchSequenceNumberTimeout(), m21getTuningConfig.getMaxRecordsPerPoll(), m20getIoConfig.isUseEarliestSequenceNumber());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getTaskGroupIdForPartition(String str) {
        return getTaskGroupIdForPartitionWithProvidedList(str, this.partitionIds);
    }

    private int getTaskGroupIdForPartitionWithProvidedList(String str, List<String> list) {
        int indexOf = list.indexOf(str);
        return indexOf < 0 ? indexOf : list.indexOf(str) % this.spec.m20getIoConfig().getTaskCount().intValue();
    }

    protected Map<Integer, Set<String>> recomputePartitionGroupsForExpiration(Set<String> set) {
        ArrayList arrayList = new ArrayList(set);
        HashMap hashMap = new HashMap();
        for (String str : set) {
            ((Set) hashMap.computeIfAbsent(Integer.valueOf(getTaskGroupIdForPartitionWithProvidedList(str, arrayList)), num -> {
                return new HashSet();
            })).add(str);
        }
        return hashMap;
    }

    protected boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata) {
        return dataSourceMetadata instanceof KinesisDataSourceMetadata;
    }

    protected boolean doesTaskTypeMatchSupervisor(Task task) {
        return task instanceof KinesisIndexTask;
    }

    protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int i, boolean z) {
        KinesisSupervisorIOConfig m20getIoConfig = this.spec.m20getIoConfig();
        Map<String, Long> timeLagPerPartition = getTimeLagPerPartition(getHighestCurrentOffsets());
        return new KinesisSupervisorReportPayload(this.spec.getDataSchema().getDataSource(), m20getIoConfig.getStream(), Integer.valueOf(i), m20getIoConfig.getReplicas(), Long.valueOf(m20getIoConfig.getTaskDuration().getMillis() / 1000), this.spec.isSuspended(), this.stateManager.isHealthy(), this.stateManager.getSupervisorState().getBasicState(), this.stateManager.getSupervisorState(), this.stateManager.getExceptionEvents(), z ? timeLagPerPartition : null, z ? Long.valueOf(timeLagPerPartition.values().stream().mapToLong(l -> {
            return Math.max(l.longValue(), 0L);
        }).sum()) : null);
    }

    protected Map<String, Long> getRecordLagPerPartition(Map<String, String> map) {
        return ImmutableMap.of();
    }

    protected Map<String, Long> getTimeLagPerPartition(Map<String, String> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return (entry.getValue() == null || this.currentPartitionTimeLag == null || this.currentPartitionTimeLag.get(entry.getKey()) == null) ? false : true;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return this.currentPartitionTimeLag.get(entry2.getKey());
        }));
    }

    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String str, Map<String, String> map) {
        return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers(str, map));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OrderedSequenceNumber<String> makeSequenceNumber(String str, boolean z) {
        return KinesisSequenceNumber.of(str, z);
    }

    protected void updatePartitionLagFromStream() {
        this.currentPartitionTimeLag = ((KinesisRecordSupplier) this.recordSupplier).getPartitionsTimeLag(getIoConfig().getStream(), getHighestCurrentOffsets());
    }

    protected Map<String, Long> getPartitionRecordLag() {
        return null;
    }

    protected Map<String, Long> getPartitionTimeLag() {
        return this.currentPartitionTimeLag;
    }

    protected String baseTaskName() {
        return "index_kinesis";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getNotSetMarker, reason: merged with bridge method [inline-methods] */
    public String m16getNotSetMarker() {
        return OFFSET_NOT_SET;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getEndOfPartitionMarker, reason: merged with bridge method [inline-methods] */
    public String m15getEndOfPartitionMarker() {
        return KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isEndOfShard(String str) {
        return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isShardExpirationMarker(String str) {
        return KinesisSequenceNumber.EXPIRED_MARKER.equals(str);
    }

    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
        return true;
    }

    protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(Map<String, OrderedSequenceNumber<String>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, OrderedSequenceNumber<String>> entry : map.entrySet()) {
            if (((String) entry.getValue().get()).equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)) {
                log.debug("Excluding shard[%s] because it has reached EOS.", new Object[]{entry.getKey()});
            } else {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    protected boolean supportsPartitionExpiration() {
        return true;
    }

    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(SeekableStreamDataSourceMetadata<String, String> seekableStreamDataSourceMetadata, Set<String> set) {
        log.info("Marking expired shards in metadata: " + set, new Object[0]);
        return createDataSourceMetadataWithClosedOrExpiredPartitions(seekableStreamDataSourceMetadata, set, KinesisSequenceNumber.EXPIRED_MARKER);
    }

    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedPartitions(SeekableStreamDataSourceMetadata<String, String> seekableStreamDataSourceMetadata, Set<String> set) {
        log.info("Marking closed shards in metadata: " + set, new Object[0]);
        return createDataSourceMetadataWithClosedOrExpiredPartitions(seekableStreamDataSourceMetadata, set, KinesisSequenceNumber.END_OF_SHARD_MARKER);
    }

    private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(SeekableStreamDataSourceMetadata<String, String> seekableStreamDataSourceMetadata, Set<String> set, String str) {
        SeekableStreamStartSequenceNumbers seekableStreamEndSequenceNumbers;
        SeekableStreamStartSequenceNumbers seekableStreamSequenceNumbers = ((KinesisDataSourceMetadata) seekableStreamDataSourceMetadata).getSeekableStreamSequenceNumbers();
        Map partitionSequenceNumberMap = seekableStreamSequenceNumbers.getPartitionSequenceNumberMap();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : partitionSequenceNumberMap.entrySet()) {
            if (set.contains(entry.getKey())) {
                hashMap.put(entry.getKey(), str);
            } else {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        if (seekableStreamSequenceNumbers instanceof SeekableStreamStartSequenceNumbers) {
            HashSet hashSet = new HashSet();
            for (String str2 : seekableStreamSequenceNumbers.getExclusivePartitions()) {
                if (!set.contains(str2)) {
                    hashSet.add(str2);
                }
            }
            seekableStreamEndSequenceNumbers = new SeekableStreamStartSequenceNumbers(seekableStreamSequenceNumbers.getStream(), (String) null, hashMap, (Map) null, hashSet);
        } else {
            seekableStreamEndSequenceNumbers = new SeekableStreamEndSequenceNumbers(seekableStreamSequenceNumbers.getStream(), (String) null, hashMap, (Map) null);
        }
        return new KinesisDataSourceMetadata(seekableStreamEndSequenceNumbers);
    }
}
