package org.apache.druid.indexing.common.actions;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/indexing/common/actions/SegmentAllocationQueue.class */
public class SegmentAllocationQueue {
    private static final Logger log = new Logger(SegmentAllocationQueue.class);
    private static final int MAX_QUEUE_SIZE = 2000;
    private static final int MAX_BATCH_SIZE = 500;
    private final long maxWaitTimeMillis;
    private final TaskLockbox taskLockbox;
    private final ScheduledExecutorService executor;
    private final IndexerMetadataStorageCoordinator metadataStorage;
    private final ServiceEmitter emitter;
    private final AtomicBoolean isLeader = new AtomicBoolean(false);
    private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
    private final BlockingDeque<AllocateRequestKey> processingQueue = new LinkedBlockingDeque(MAX_QUEUE_SIZE);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/actions/SegmentAllocationQueue$AllocateRequestBatch.class */
    public class AllocateRequestBatch {
        private final AllocateRequestKey key;
        private final Map<SegmentAllocateRequest, CompletableFuture<SegmentIdWithShardSpec>> requestToFuture = new HashMap();

        AllocateRequestBatch(AllocateRequestKey allocateRequestKey) {
            this.key = allocateRequestKey;
        }

        synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest segmentAllocateRequest) {
            SegmentAllocationQueue.log.debug("Adding request to batch [%s]: %s", new Object[]{this.key, segmentAllocateRequest.getAction()});
            return this.requestToFuture.computeIfAbsent(segmentAllocateRequest, segmentAllocateRequest2 -> {
                return new CompletableFuture();
            });
        }

        synchronized void transferRequestsFrom(AllocateRequestBatch allocateRequestBatch) {
            this.requestToFuture.putAll(allocateRequestBatch.requestToFuture);
            allocateRequestBatch.requestToFuture.clear();
        }

        synchronized Set<SegmentAllocateRequest> getRequests() {
            return new HashSet(this.requestToFuture.keySet());
        }

        synchronized void failPendingRequests(String str) {
            failPendingRequests((Throwable) new ISE(str, new Object[0]));
        }

        synchronized void failPendingRequests(Throwable th) {
            if (this.requestToFuture.isEmpty()) {
                return;
            }
            SegmentAllocationQueue.log.warn("Failing [%d] requests in batch [%s], reason [%s].", new Object[]{Integer.valueOf(size()), th.getMessage(), this.key});
            this.requestToFuture.values().forEach(completableFuture -> {
                completableFuture.completeExceptionally(th);
            });
            this.requestToFuture.keySet().forEach(segmentAllocateRequest -> {
                SegmentAllocationQueue.this.emitTaskMetric("task/action/failed/count", 1L, segmentAllocateRequest);
            });
            this.requestToFuture.clear();
        }

        synchronized void completePendingRequestsWithNull() {
            if (this.requestToFuture.isEmpty()) {
                return;
            }
            this.requestToFuture.values().forEach(completableFuture -> {
                completableFuture.complete(null);
            });
            this.requestToFuture.keySet().forEach(segmentAllocateRequest -> {
                SegmentAllocationQueue.this.emitTaskMetric("task/action/failed/count", 1L, segmentAllocateRequest);
            });
            this.requestToFuture.clear();
        }

        synchronized void handleResult(SegmentAllocateResult segmentAllocateResult, SegmentAllocateRequest segmentAllocateRequest) {
            segmentAllocateRequest.incrementAttempts();
            if (segmentAllocateResult.isSuccess()) {
                SegmentAllocationQueue.this.emitTaskMetric("task/action/success/count", 1L, segmentAllocateRequest);
                this.requestToFuture.remove(segmentAllocateRequest).complete(segmentAllocateResult.getSegmentId());
            } else {
                if (segmentAllocateRequest.canRetry()) {
                    SegmentAllocationQueue.log.info("Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].", new Object[]{Integer.valueOf(segmentAllocateRequest.getAttempts()), segmentAllocateResult.getErrorMessage(), segmentAllocateRequest.getAction()});
                    return;
                }
                SegmentAllocationQueue.this.emitTaskMetric("task/action/failed/count", 1L, segmentAllocateRequest);
                SegmentAllocationQueue.log.error("Exhausted max attempts [%d] for allocation with latest error [%s]. Completing action [%s] with a null value.", new Object[]{Integer.valueOf(segmentAllocateRequest.getAttempts()), segmentAllocateResult.getErrorMessage(), segmentAllocateRequest.getAction()});
                this.requestToFuture.remove(segmentAllocateRequest).complete(null);
            }
        }

        synchronized boolean isEmpty() {
            return this.requestToFuture.isEmpty();
        }

        synchronized int size() {
            return this.requestToFuture.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/actions/SegmentAllocationQueue$AllocateRequestKey.class */
    public static class AllocateRequestKey {
        private final int batchIncrementalId;
        private long queueTimeMillis;
        private final long maxWaitTimeMillis;
        private final String dataSource;
        private final String groupId;
        private final Interval preferredAllocationInterval;
        private final Granularity preferredSegmentGranularity;
        private final boolean skipSegmentLineageCheck;
        private final LockGranularity lockGranularity;
        private final boolean useNonRootGenPartitionSpace;
        private final int hash;
        private final String serialized;

        AllocateRequestKey(SegmentAllocateRequest segmentAllocateRequest, long j, int i) {
            SegmentAllocateAction action = segmentAllocateRequest.getAction();
            Task task = segmentAllocateRequest.getTask();
            this.batchIncrementalId = i;
            this.dataSource = action.getDataSource();
            this.groupId = task.getGroupId();
            this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
            this.lockGranularity = action.getLockGranularity();
            this.useNonRootGenPartitionSpace = action.getPartialShardSpec().useNonRootGenerationPartitionSpace();
            this.preferredSegmentGranularity = action.getPreferredSegmentGranularity();
            this.preferredAllocationInterval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
            this.hash = Objects.hash(this.dataSource, this.groupId, Integer.valueOf(i), Boolean.valueOf(this.skipSegmentLineageCheck), Boolean.valueOf(this.useNonRootGenPartitionSpace), this.preferredAllocationInterval, this.lockGranularity);
            this.serialized = serialize();
            this.maxWaitTimeMillis = j;
        }

        void resetQueueTime() {
            this.queueTimeMillis = System.currentTimeMillis();
        }

        long getQueueTime() {
            return this.queueTimeMillis;
        }

        boolean isDue() {
            return System.currentTimeMillis() - this.queueTimeMillis >= this.maxWaitTimeMillis;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AllocateRequestKey allocateRequestKey = (AllocateRequestKey) obj;
            return this.dataSource.equals(allocateRequestKey.dataSource) && this.groupId.equals(allocateRequestKey.groupId) && this.batchIncrementalId == allocateRequestKey.batchIncrementalId && this.skipSegmentLineageCheck == allocateRequestKey.skipSegmentLineageCheck && this.useNonRootGenPartitionSpace == allocateRequestKey.useNonRootGenPartitionSpace && this.preferredAllocationInterval.equals(allocateRequestKey.preferredAllocationInterval) && this.lockGranularity == allocateRequestKey.lockGranularity;
        }

        public int hashCode() {
            return this.hash;
        }

        public String toString() {
            return this.serialized;
        }

        private String serialize() {
            return "{datasource='" + this.dataSource + "', groupId='" + this.groupId + "', batchId=" + this.batchIncrementalId + ", lock=" + this.lockGranularity + ", allocInterval=" + this.preferredAllocationInterval + ", skipLineageCheck=" + this.skipSegmentLineageCheck + '}';
        }
    }

    @Inject
    public SegmentAllocationQueue(TaskLockbox taskLockbox, TaskLockConfig taskLockConfig, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, ServiceEmitter serviceEmitter, ScheduledExecutorFactory scheduledExecutorFactory) {
        this.emitter = serviceEmitter;
        this.taskLockbox = taskLockbox;
        this.metadataStorage = indexerMetadataStorageCoordinator;
        this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
        this.executor = taskLockConfig.isBatchSegmentAllocation() ? scheduledExecutorFactory.create(1, "SegmentAllocQueue-%s") : null;
    }

    @LifecycleStart
    public void start() {
        if (isEnabled()) {
            log.info("Initializing segment allocation queue.", new Object[0]);
            scheduleQueuePoll(this.maxWaitTimeMillis);
        }
    }

    @LifecycleStop
    public void stop() {
        if (isEnabled()) {
            log.info("Tearing down segment allocation queue.", new Object[0]);
            this.executor.shutdownNow();
        }
    }

    public void becomeLeader() {
        if (!this.isLeader.compareAndSet(false, true)) {
            log.info("Already the leader. Queue processing has started.", new Object[0]);
        } else if (isEnabled()) {
            log.info("Elected leader. Starting queue processing.", new Object[0]);
        } else {
            log.info("Elected leader but batched segment allocation is disabled. Segment allocation queue will not be used.", new Object[0]);
        }
    }

    public void stopBeingLeader() {
        if (!this.isLeader.compareAndSet(true, false)) {
            log.info("Already surrendered leadership. Queue processing is stopped.", new Object[0]);
        } else if (isEnabled()) {
            log.info("Not leader anymore. Stopping queue processing.", new Object[0]);
        } else {
            log.info("Not leader anymore. Segment allocation queue is already disabled.", new Object[0]);
        }
    }

    public boolean isEnabled() {
        return (this.executor == null || this.executor.isShutdown()) ? false : true;
    }

    private void scheduleQueuePoll(long j) {
        this.executor.schedule(this::processBatchesDue, j, TimeUnit.MILLISECONDS);
    }

    public int size() {
        return this.processingQueue.size();
    }

    public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest segmentAllocateRequest) {
        if (!this.isLeader.get()) {
            throw new ISE("Cannot allocate segment if not leader.", new Object[0]);
        }
        if (!isEnabled()) {
            throw new ISE("Batched segment allocation is disabled.", new Object[0]);
        }
        AllocateRequestKey keyForAvailableBatch = getKeyForAvailableBatch(segmentAllocateRequest);
        AtomicReference atomicReference = new AtomicReference();
        this.keyToBatch.compute(keyForAvailableBatch, (allocateRequestKey, allocateRequestBatch) -> {
            if (allocateRequestBatch != null) {
                atomicReference.set(allocateRequestBatch.add(segmentAllocateRequest));
                return allocateRequestBatch;
            }
            AllocateRequestBatch allocateRequestBatch = new AllocateRequestBatch(allocateRequestKey);
            atomicReference.set(allocateRequestBatch.add(segmentAllocateRequest));
            if (addBatchToQueue(allocateRequestBatch)) {
                return allocateRequestBatch;
            }
            return null;
        });
        return (Future) atomicReference.get();
    }

    private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest segmentAllocateRequest) {
        for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
            AllocateRequestKey allocateRequestKey = new AllocateRequestKey(segmentAllocateRequest, this.maxWaitTimeMillis, i);
            AllocateRequestBatch allocateRequestBatch = this.keyToBatch.get(allocateRequestKey);
            if (allocateRequestBatch == null || allocateRequestBatch.size() < MAX_BATCH_SIZE) {
                return allocateRequestKey;
            }
        }
        throw new ISE("Allocation queue is at capacity, all batches are full.", new Object[0]);
    }

    private boolean addBatchToQueue(AllocateRequestBatch allocateRequestBatch) {
        allocateRequestBatch.key.resetQueueTime();
        if (!this.isLeader.get()) {
            allocateRequestBatch.failPendingRequests("Not leader anymore");
            return false;
        }
        if (this.processingQueue.offer(allocateRequestBatch.key)) {
            log.debug("Added a new batch [%s] to queue.", new Object[]{allocateRequestBatch.key});
            return true;
        }
        allocateRequestBatch.failPendingRequests("Segment allocation queue is full. Check the metric `task/action/batch/runTime` to determine if metadata operations are slow.");
        return false;
    }

    private void requeueBatch(AllocateRequestBatch allocateRequestBatch) {
        log.info("Requeueing [%d] failed requests in batch [%s].", new Object[]{Integer.valueOf(allocateRequestBatch.size()), allocateRequestBatch.key});
        this.keyToBatch.compute(allocateRequestBatch.key, (allocateRequestKey, allocateRequestBatch2) -> {
            if (allocateRequestBatch2 != null) {
                allocateRequestBatch2.transferRequestsFrom(allocateRequestBatch);
                return allocateRequestBatch2;
            }
            if (addBatchToQueue(allocateRequestBatch)) {
                return allocateRequestBatch;
            }
            return null;
        });
    }

    private void processBatchesDue() {
        boolean z;
        clearQueueIfNotLeader();
        int i = 0;
        AllocateRequestKey peekFirst = this.processingQueue.peekFirst();
        while (true) {
            AllocateRequestKey allocateRequestKey = peekFirst;
            if (allocateRequestKey == null || !allocateRequestKey.isDue()) {
                break;
            }
            this.processingQueue.pollFirst();
            AllocateRequestBatch remove = this.keyToBatch.remove(allocateRequestKey);
            try {
                z = processBatch(remove);
            } catch (Throwable th) {
                remove.failPendingRequests(th);
                z = true;
                log.error(th, "Error while processing batch [%s]", new Object[]{allocateRequestKey});
            }
            if (z) {
                i++;
            } else {
                requeueBatch(remove);
            }
            peekFirst = this.processingQueue.peek();
        }
        long max = this.processingQueue.isEmpty() ? this.maxWaitTimeMillis : Math.max(0L, this.maxWaitTimeMillis - (System.currentTimeMillis() - this.processingQueue.peek().getQueueTime()));
        scheduleQueuePoll(max);
        log.debug("Processed [%d] batches, next execution in [%d ms]", new Object[]{Integer.valueOf(i), Long.valueOf(max)});
    }

    private void clearQueueIfNotLeader() {
        int i = 0;
        AllocateRequestKey peekFirst = this.processingQueue.peekFirst();
        while (true) {
            AllocateRequestKey allocateRequestKey = peekFirst;
            if (allocateRequestKey == null || this.isLeader.get()) {
                break;
            }
            this.processingQueue.pollFirst();
            this.keyToBatch.remove(allocateRequestKey).failPendingRequests("Not leader anymore");
            i++;
            peekFirst = this.processingQueue.peekFirst();
        }
        if (i > 0) {
            log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", new Object[]{Integer.valueOf(i), Integer.valueOf(this.processingQueue.size())});
        }
    }

    private boolean processBatch(AllocateRequestBatch allocateRequestBatch) {
        AllocateRequestKey allocateRequestKey = allocateRequestBatch.key;
        if (allocateRequestBatch.isEmpty()) {
            return true;
        }
        if (!this.isLeader.get()) {
            allocateRequestBatch.failPendingRequests("Not leader anymore");
            return true;
        }
        log.debug("Processing [%d] requests for batch [%s], queue time [%s].", new Object[]{Integer.valueOf(allocateRequestBatch.size()), allocateRequestKey, Long.valueOf(allocateRequestKey.getQueueTime())});
        long currentTimeMillis = System.currentTimeMillis();
        int size = allocateRequestBatch.size();
        emitBatchMetric("task/action/batch/size", size, allocateRequestKey);
        emitBatchMetric("task/action/batch/queueTime", currentTimeMillis - allocateRequestKey.getQueueTime(), allocateRequestKey);
        Set<DataSegment> retrieveUsedSegments = retrieveUsedSegments(allocateRequestKey);
        int allocateSegmentsForBatch = allocateSegmentsForBatch(allocateRequestBatch, retrieveUsedSegments);
        emitBatchMetric("task/action/batch/attempts", 1L, allocateRequestKey);
        emitBatchMetric("task/action/batch/runTime", System.currentTimeMillis() - currentTimeMillis, allocateRequestKey);
        log.info("Successfully processed [%d / %d] requests in batch [%s].", new Object[]{Integer.valueOf(allocateSegmentsForBatch), Integer.valueOf(size), allocateRequestKey});
        if (allocateRequestBatch.isEmpty()) {
            return true;
        }
        log.debug("There are [%d] failed requests in batch [%s].", new Object[]{Integer.valueOf(allocateRequestBatch.size()), allocateRequestKey});
        if (!retrieveUsedSegments(allocateRequestKey).equals(retrieveUsedSegments)) {
            log.debug("Used segments have changed. Requeuing failed requests.", new Object[0]);
            return false;
        }
        log.warn("Completing [%d] failed requests in batch [%s] with null value as there are conflicting segments. Cannot retry allocation until the set of used segments overlapping the allocation interval [%s] changes.", new Object[]{Integer.valueOf(size()), allocateRequestKey, allocateRequestKey.preferredAllocationInterval});
        allocateRequestBatch.completePendingRequestsWithNull();
        return true;
    }

    private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey allocateRequestKey) {
        return new HashSet(this.metadataStorage.retrieveUsedSegmentsForInterval(allocateRequestKey.dataSource, allocateRequestKey.preferredAllocationInterval, Segments.ONLY_VISIBLE));
    }

    private int allocateSegmentsForBatch(AllocateRequestBatch allocateRequestBatch, Set<DataSegment> set) {
        int i = 0;
        Set<SegmentAllocateRequest> requests = allocateRequestBatch.getRequests();
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        if (set.isEmpty()) {
            hashSet.addAll(requests);
        } else {
            Interval[] sortedIntervals = getSortedIntervals(set);
            HashMap hashMap = new HashMap();
            for (SegmentAllocateRequest segmentAllocateRequest : requests) {
                Interval findOverlappingInterval = Intervals.findOverlappingInterval(segmentAllocateRequest.getRowInterval(), sortedIntervals);
                if (findOverlappingInterval == null) {
                    hashSet.add(segmentAllocateRequest);
                } else if (findOverlappingInterval.contains(segmentAllocateRequest.getRowInterval())) {
                    ((List) hashMap.computeIfAbsent(findOverlappingInterval, interval -> {
                        return new ArrayList();
                    })).add(segmentAllocateRequest);
                } else {
                    arrayList.add(segmentAllocateRequest);
                }
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                i += allocateSegmentsForInterval((Interval) entry.getKey(), (List) entry.getValue(), allocateRequestBatch);
            }
        }
        HashSet hashSet2 = new HashSet(hashSet);
        Iterator it = Granularity.granularitiesFinerThan(allocateRequestBatch.key.preferredSegmentGranularity).iterator();
        while (it.hasNext()) {
            for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry2 : getRequestsByInterval(hashSet2, (Granularity) it.next()).entrySet()) {
                i += allocateSegmentsForInterval(entry2.getKey(), entry2.getValue(), allocateRequestBatch);
                hashSet2.retainAll(allocateRequestBatch.getRequests());
            }
        }
        if (!arrayList.isEmpty()) {
            log.info("Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments. These cannot be processed until the set of used segments changes. Example request: [%s]", new Object[]{Integer.valueOf(arrayList.size()), allocateRequestBatch.key, arrayList.get(0)});
        }
        return i;
    }

    private Interval[] getSortedIntervals(Set<DataSegment> set) {
        TreeSet treeSet = new TreeSet(Comparators.intervalsByStartThenEnd());
        set.forEach(dataSegment -> {
            treeSet.add(dataSegment.getInterval());
        });
        return (Interval[]) treeSet.toArray(new Interval[0]);
    }

    private int allocateSegmentsForInterval(Interval interval, List<SegmentAllocateRequest> list, AllocateRequestBatch allocateRequestBatch) {
        if (list.isEmpty()) {
            return 0;
        }
        AllocateRequestKey allocateRequestKey = allocateRequestBatch.key;
        log.debug("Trying allocation for [%d] requests, interval [%s] in batch [%s]", new Object[]{Integer.valueOf(list.size()), interval, allocateRequestKey});
        List<SegmentAllocateResult> allocateSegments = this.taskLockbox.allocateSegments(list, allocateRequestKey.dataSource, interval, allocateRequestKey.skipSegmentLineageCheck, allocateRequestKey.lockGranularity);
        int i = 0;
        for (int i2 = 0; i2 < list.size(); i2++) {
            SegmentAllocateRequest segmentAllocateRequest = list.get(i2);
            SegmentAllocateResult segmentAllocateResult = allocateSegments.get(i2);
            if (segmentAllocateResult.isSuccess()) {
                i++;
            }
            allocateRequestBatch.handleResult(segmentAllocateResult, segmentAllocateRequest);
        }
        return i;
    }

    private Map<Interval, List<SegmentAllocateRequest>> getRequestsByInterval(Set<SegmentAllocateRequest> set, Granularity granularity) {
        HashMap hashMap = new HashMap();
        for (SegmentAllocateRequest segmentAllocateRequest : set) {
            Interval bucket = granularity.bucket(segmentAllocateRequest.getAction().getTimestamp());
            if (bucket.contains(segmentAllocateRequest.getRowInterval())) {
                ((List) hashMap.computeIfAbsent(bucket, interval -> {
                    return new ArrayList();
                })).add(segmentAllocateRequest);
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emitTaskMetric(String str, long j, SegmentAllocateRequest segmentAllocateRequest) {
        ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
        IndexTaskUtils.setTaskDimensions(builder, segmentAllocateRequest.getTask());
        builder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
        this.emitter.emit(builder.build(str, Long.valueOf(j)));
    }

    private void emitBatchMetric(String str, long j, AllocateRequestKey allocateRequestKey) {
        ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
        builder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
        builder.setDimension("dataSource", allocateRequestKey.dataSource);
        builder.setDimension(CompactionIntervalSpec.TYPE, allocateRequestKey.preferredAllocationInterval.toString());
        this.emitter.emit(builder.build(str, Long.valueOf(j)));
    }
}
