package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.class */
public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager {
    private final Logger LOG = new Logger(UnifiedIndexerAppenderatorsManager.class);
    private final Map<String, DatasourceBundle> datasourceBundles = new HashMap();
    private final QueryProcessingPool queryProcessingPool;
    private final JoinableFactory joinableFactory;
    private final WorkerConfig workerConfig;
    private final Cache cache;
    private final CacheConfig cacheConfig;
    private final CachePopulatorStats cachePopulatorStats;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter serviceEmitter;
    private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
    private ListeningExecutorService mergeExecutor;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager$DatasourceBundle.class */
    public class DatasourceBundle {
        private final SinkQuerySegmentWalker walker;
        private final Map<String, List<Appenderator>> taskAppenderatorMap = new HashMap();

        public DatasourceBundle(String str) {
            this.walker = new SinkQuerySegmentWalker(str, new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER), UnifiedIndexerAppenderatorsManager.this.objectMapper, UnifiedIndexerAppenderatorsManager.this.serviceEmitter, (QueryRunnerFactoryConglomerate) UnifiedIndexerAppenderatorsManager.this.queryRunnerFactoryConglomerateProvider.get(), UnifiedIndexerAppenderatorsManager.this.queryProcessingPool, UnifiedIndexerAppenderatorsManager.this.joinableFactory, (Cache) Preconditions.checkNotNull(UnifiedIndexerAppenderatorsManager.this.cache, "cache"), UnifiedIndexerAppenderatorsManager.this.cacheConfig, UnifiedIndexerAppenderatorsManager.this.cachePopulatorStats);
        }

        public SinkQuerySegmentWalker getWalker() {
            return this.walker;
        }

        public void addAppenderator(String str, Appenderator appenderator) {
            this.taskAppenderatorMap.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(appenderator);
        }
    }

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager$LimitedPoolIndexMerger.class */
    public static class LimitedPoolIndexMerger implements IndexMerger {
        private static final String ERROR_MSG = "Shouldn't be called";
        private final IndexMerger baseMerger;
        private final ListeningExecutorService mergeExecutor;

        public LimitedPoolIndexMerger(IndexMerger indexMerger, ListeningExecutorService listeningExecutorService) {
            this.baseMerger = indexMerger;
            this.mergeExecutor = listeningExecutorService;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.druid.segment.IndexMerger
        public File persist(IncrementalIndex incrementalIndex, Interval interval, File file, IndexSpec indexSpec, ProgressIndicator progressIndicator, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
            try {
                return (File) this.mergeExecutor.submit(() -> {
                    return this.baseMerger.persist(incrementalIndex, interval, file, indexSpec, progressIndicator, segmentWriteOutMediumFactory);
                }).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.druid.segment.IndexMerger
        public File merge(List<IndexableAdapter> list, boolean z, AggregatorFactory[] aggregatorFactoryArr, File file, DimensionsSpec dimensionsSpec, IndexSpec indexSpec, int i) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.druid.segment.IndexMerger
        public File mergeQueryableIndex(List<QueryableIndex> list, boolean z, AggregatorFactory[] aggregatorFactoryArr, @Nullable DimensionsSpec dimensionsSpec, File file, IndexSpec indexSpec, IndexSpec indexSpec2, ProgressIndicator progressIndicator, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int i) {
            try {
                return (File) this.mergeExecutor.submit(() -> {
                    return this.baseMerger.mergeQueryableIndex(list, z, aggregatorFactoryArr, dimensionsSpec, file, indexSpec, indexSpec2, new BaseProgressIndicator(), segmentWriteOutMediumFactory, i);
                }).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager$MemoryParameterOverridingAppenderatorConfig.class */
    public static class MemoryParameterOverridingAppenderatorConfig implements AppenderatorConfig {
        private final AppenderatorConfig baseConfig;
        private final long newMaxBytesInMemory;

        public MemoryParameterOverridingAppenderatorConfig(AppenderatorConfig appenderatorConfig, long j) {
            this.baseConfig = appenderatorConfig;
            this.newMaxBytesInMemory = j;
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public boolean isReportParseExceptions() {
            return this.baseConfig.isReportParseExceptions();
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public AppendableIndexSpec getAppendableIndexSpec() {
            return this.baseConfig.getAppendableIndexSpec();
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public int getMaxRowsInMemory() {
            return Integer.MAX_VALUE;
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public long getMaxBytesInMemory() {
            return this.newMaxBytesInMemory;
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public boolean isSkipBytesInMemoryOverheadCheck() {
            return this.baseConfig.isSkipBytesInMemoryOverheadCheck();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public int getMaxPendingPersists() {
            return this.baseConfig.getMaxPendingPersists();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        @Nullable
        public Integer getMaxRowsPerSegment() {
            return this.baseConfig.getMaxRowsPerSegment();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        @Nullable
        public Long getMaxTotalRows() {
            return this.baseConfig.getMaxTotalRows();
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public PartitionsSpec getPartitionsSpec() {
            return this.baseConfig.getPartitionsSpec();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public Period getIntermediatePersistPeriod() {
            return this.baseConfig.getIntermediatePersistPeriod();
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public IndexSpec getIndexSpec() {
            return this.baseConfig.getIndexSpec();
        }

        @Override // org.apache.druid.segment.indexing.TuningConfig
        public IndexSpec getIndexSpecForIntermediatePersists() {
            return this.baseConfig.getIndexSpecForIntermediatePersists();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public File getBasePersistDirectory() {
            return this.baseConfig.getBasePersistDirectory();
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        public AppenderatorConfig withBasePersistDirectory(File file) {
            return this;
        }

        @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorConfig
        @Nullable
        public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
            return this.baseConfig.getSegmentWriteOutMediumFactory();
        }
    }

    @Inject
    public UnifiedIndexerAppenderatorsManager(QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, WorkerConfig workerConfig, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper, ServiceEmitter serviceEmitter, Provider<QueryRunnerFactoryConglomerate> provider) {
        this.queryProcessingPool = queryProcessingPool;
        this.joinableFactory = joinableFactory;
        this.workerConfig = workerConfig;
        this.cache = cache;
        this.cacheConfig = cacheConfig;
        this.cachePopulatorStats = cachePopulatorStats;
        this.objectMapper = objectMapper;
        this.serviceEmitter = serviceEmitter;
        this.queryRunnerFactoryConglomerateProvider = provider;
        this.mergeExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(workerConfig.getNumConcurrentMerges(), "unified-indexer-merge-pool-%d"));
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public Appenderator createRealtimeAppenderatorForTask(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, ServiceEmitter serviceEmitter, QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, boolean z) {
        StreamAppenderator streamAppenderator;
        synchronized (this) {
            DatasourceBundle computeIfAbsent = this.datasourceBundles.computeIfAbsent(dataSchema.getDataSource(), str2 -> {
                return new DatasourceBundle(str2);
            });
            streamAppenderator = new StreamAppenderator(str, dataSchema, rewriteAppenderatorConfigMemoryLimits(appenderatorConfig), fireDepartmentMetrics, dataSegmentPusher, objectMapper, dataSegmentAnnouncer, computeIfAbsent.getWalker(), indexIO, wrapIndexMerger(indexMerger), cache, rowIngestionMeters, parseExceptionHandler, z);
            computeIfAbsent.addAppenderator(str, streamAppenderator);
        }
        return streamAppenderator;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public Appenderator createOfflineAppenderatorForTask(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, boolean z) {
        Appenderator createOffline;
        synchronized (this) {
            DatasourceBundle computeIfAbsent = this.datasourceBundles.computeIfAbsent(dataSchema.getDataSource(), str2 -> {
                return new DatasourceBundle(str2);
            });
            createOffline = Appenderators.createOffline(str, dataSchema, rewriteAppenderatorConfigMemoryLimits(appenderatorConfig), fireDepartmentMetrics, dataSegmentPusher, objectMapper, indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler, z);
            computeIfAbsent.addAppenderator(str, createOffline);
        }
        return createOffline;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public Appenderator createOpenSegmentsOfflineAppenderatorForTask(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, boolean z) {
        Appenderator createOpenSegmentsOffline;
        synchronized (this) {
            DatasourceBundle computeIfAbsent = this.datasourceBundles.computeIfAbsent(dataSchema.getDataSource(), str2 -> {
                return new DatasourceBundle(str2);
            });
            createOpenSegmentsOffline = Appenderators.createOpenSegmentsOffline(str, dataSchema, rewriteAppenderatorConfigMemoryLimits(appenderatorConfig), fireDepartmentMetrics, dataSegmentPusher, objectMapper, indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler, z);
            computeIfAbsent.addAppenderator(str, createOpenSegmentsOffline);
        }
        return createOpenSegmentsOffline;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public Appenderator createClosedSegmentsOfflineAppenderatorForTask(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, boolean z) {
        Appenderator createClosedSegmentsOffline;
        synchronized (this) {
            DatasourceBundle computeIfAbsent = this.datasourceBundles.computeIfAbsent(dataSchema.getDataSource(), str2 -> {
                return new DatasourceBundle(str2);
            });
            createClosedSegmentsOffline = Appenderators.createClosedSegmentsOffline(str, dataSchema, rewriteAppenderatorConfigMemoryLimits(appenderatorConfig), fireDepartmentMetrics, dataSegmentPusher, objectMapper, indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler, z);
            computeIfAbsent.addAppenderator(str, createClosedSegmentsOffline);
        }
        return createClosedSegmentsOffline;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public void removeAppenderatorsForTask(String str, String str2) {
        synchronized (this) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.get(str2);
            if (datasourceBundle == null) {
                this.LOG.debug("Could not find datasource bundle for [%s], task [%s]", str2, str);
            } else {
                if (((List) datasourceBundle.taskAppenderatorMap.remove(str)) == null) {
                    this.LOG.debug("Tried to remove appenderators for task [%s] but none were found.", str);
                }
                if (datasourceBundle.taskAppenderatorMap.isEmpty()) {
                    this.datasourceBundles.remove(str2);
                }
            }
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        return getBundle(query).getWalker().getQueryRunnerForIntervals(query, iterable);
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        return getBundle(query).getWalker().getQueryRunnerForSegments(query, iterable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public <T> DatasourceBundle getBundle(Query<T> query) {
        DatasourceBundle datasourceBundle;
        DataSourceAnalysis forDataSource = DataSourceAnalysis.forDataSource(query.getDataSource());
        TableDataSource orElseThrow = forDataSource.getBaseTableDataSource().orElseThrow(() -> {
            return new ISE("Cannot handle datasource: %s", forDataSource.getDataSource());
        });
        synchronized (this) {
            datasourceBundle = this.datasourceBundles.get(orElseThrow.getName());
        }
        if (datasourceBundle == null) {
            throw new IAE("Could not find segment walker for datasource [%s]", orElseThrow.getName());
        }
        return datasourceBundle;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public boolean shouldTaskMakeNodeAnnouncements() {
        return false;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.AppenderatorsManager
    public void shutdown() {
        if (this.mergeExecutor != null) {
            this.mergeExecutor.shutdownNow();
            this.mergeExecutor = null;
        }
    }

    @VisibleForTesting
    public Map<String, DatasourceBundle> getDatasourceBundles() {
        return this.datasourceBundles;
    }

    private AppenderatorConfig rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig appenderatorConfig) {
        return new MemoryParameterOverridingAppenderatorConfig(appenderatorConfig, this.workerConfig.getGlobalIngestionHeapLimitBytes() / this.workerConfig.getCapacity());
    }

    private IndexMerger wrapIndexMerger(IndexMerger indexMerger) {
        return new LimitedPoolIndexMerger(indexMerger, this.mergeExecutor);
    }
}
