package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.class */
public class SinkQuerySegmentWalker implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
    private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
    private final String dataSource;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final QueryProcessingPool queryProcessingPool;
    private final JoinableFactoryWrapper joinableFactoryWrapper;
    private final Cache cache;
    private final CacheConfig cacheConfig;
    private final CachePopulatorStats cachePopulatorStats;
    private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToBasePendingSegment = new ConcurrentHashMap();

    public SinkQuerySegmentWalker(String str, VersionedIntervalTimeline<String, Sink> versionedIntervalTimeline, ObjectMapper objectMapper, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryProcessingPool queryProcessingPool, JoinableFactoryWrapper joinableFactoryWrapper, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats) {
        this.dataSource = (String) Preconditions.checkNotNull(str, "dataSource");
        this.sinkTimeline = (VersionedIntervalTimeline) Preconditions.checkNotNull(versionedIntervalTimeline, "sinkTimeline");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.emitter = (ServiceEmitter) Preconditions.checkNotNull(serviceEmitter, "emitter");
        this.conglomerate = (QueryRunnerFactoryConglomerate) Preconditions.checkNotNull(queryRunnerFactoryConglomerate, "conglomerate");
        this.queryProcessingPool = (QueryProcessingPool) Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
        this.joinableFactoryWrapper = joinableFactoryWrapper;
        this.cache = (Cache) Preconditions.checkNotNull(cache, "cache");
        this.cacheConfig = (CacheConfig) Preconditions.checkNotNull(cacheConfig, "cacheConfig");
        this.cachePopulatorStats = (CachePopulatorStats) Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
        if (cache.isLocal()) {
            return;
        }
        log.warn("Configured cache[%s] is not local, caching will not be enabled.", new Object[]{cache.getClass().getName()});
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        FunctionalIterable create = FunctionalIterable.create(iterable);
        VersionedIntervalTimeline<String, Sink> versionedIntervalTimeline = this.sinkTimeline;
        Objects.requireNonNull(versionedIntervalTimeline);
        return getQueryRunnerForSegments(query, create.transformCat(versionedIntervalTimeline::lookup).transformCat(timelineObjectHolder -> {
            return FunctionalIterable.create(timelineObjectHolder.getObject()).transform(partitionChunk -> {
                return new SegmentDescriptor(timelineObjectHolder.getInterval(), (String) timelineObjectHolder.getVersion(), partitionChunk.getChunkNumber());
            });
        }));
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        QueryDataSource dataSource = query.getDataSource();
        if (!dataSource.getAnalysis().getBaseTableDataSource().filter(tableDataSource -> {
            return this.dataSource.equals(tableDataSource.getName());
        }).isPresent()) {
            throw new ISE("Cannot handle datasource: %s", new Object[]{dataSource});
        }
        QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        if (findFactory == null) {
            throw new ISE("Unknown query type[%s].", new Object[]{query.getClass()});
        }
        QueryToolChest toolchest = findFactory.getToolchest();
        boolean z = query.context().getBoolean(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
        AtomicLong atomicLong = new AtomicLong(0L);
        if ((dataSource instanceof QueryDataSource) && !toolchest.canPerformSubquery(dataSource.getQuery())) {
            throw new ISE("Cannot handle subquery: %s", new Object[]{dataSource});
        }
        Function createSegmentMapFunction = dataSource.createSegmentMapFunction(query, atomicLong);
        Optional ofNullable = Optional.ofNullable(query.getDataSource().getCacheKey());
        return CPUTimeMetricQueryRunner.safeBuild(new FinalizeResultsQueryRunner(toolchest.mergeResults(findFactory.mergeRunners(this.queryProcessingPool, Iterables.transform(iterable, segmentDescriptor -> {
            SegmentDescriptor orDefault = this.newIdToBasePendingSegment.getOrDefault(segmentDescriptor, segmentDescriptor);
            PartitionChunk findChunk = this.sinkTimeline.findChunk(orDefault.getInterval(), orDefault.getVersion(), orDefault.getPartitionNumber());
            if (findChunk == null) {
                return new ReportTimelineMissingSegmentQueryRunner(orDefault);
            }
            Sink sink = (Sink) findChunk.getObject();
            SegmentId id = sink.getSegment().getId();
            return new SpecificSegmentQueryRunner(withPerSinkMetrics(new BySegmentQueryRunner(id, orDefault.getInterval().getStart(), findFactory.mergeRunners(DirectQueryProcessingPool.INSTANCE, new SinkQueryRunners(Iterables.transform(sink, fireHydrant -> {
                boolean hasSwapped = fireHydrant.hasSwapped();
                if (z && !hasSwapped) {
                    return new Pair(fireHydrant.getSegmentDataInterval(), new NoopQueryRunner());
                }
                Optional<Pair<SegmentReference, Closeable>> segmentForQuery = fireHydrant.getSegmentForQuery(createSegmentMapFunction);
                if (!segmentForQuery.isPresent()) {
                    return new Pair(fireHydrant.getSegmentDataInterval(), new ReportTimelineMissingSegmentQueryRunner(orDefault));
                }
                Pair<SegmentReference, Closeable> pair = segmentForQuery.get();
                try {
                    QueryRunner createRunner = findFactory.createRunner((Segment) pair.lhs);
                    if (hasSwapped && this.cache.isLocal()) {
                        StorageAdapter asStorageAdapter = ((SegmentReference) pair.lhs).asStorageAdapter();
                        createRunner = new CachingQueryRunner(makeHydrantCacheIdentifier(fireHydrant), ofNullable, orDefault, Intervals.utc(asStorageAdapter.getMinTime().getMillis(), asStorageAdapter.getMaxTime().getMillis() + 1), this.objectMapper, this.cache, toolchest, createRunner, new ForegroundCachePopulator(this.objectMapper, this.cachePopulatorStats, this.cacheConfig.getMaxEntrySize()), this.cacheConfig);
                    }
                    return new Pair(((SegmentReference) pair.lhs).getDataInterval(), QueryRunnerHelper.makeClosingQueryRunner(createRunner, (Closeable) pair.rhs));
                } catch (Throwable th) {
                    throw CloseableUtils.closeAndWrapInCatch(th, (Closeable) pair.rhs);
                }
            })))), toolchest, id, atomicLong), new SpecificSegmentSpec(orDefault));
        }))), toolchest), toolchest, this.emitter, atomicLong, true);
    }

    public void registerNewVersionOfPendingSegment(SegmentIdWithShardSpec segmentIdWithShardSpec, SegmentIdWithShardSpec segmentIdWithShardSpec2) {
        this.newIdToBasePendingSegment.put(segmentIdWithShardSpec2.asSegmentId().toDescriptor(), segmentIdWithShardSpec.asSegmentId().toDescriptor());
    }

    @VisibleForTesting
    String getDataSource() {
        return this.dataSource;
    }

    private <T> QueryRunner<T> withPerSinkMetrics(QueryRunner<T> queryRunner, QueryToolChest<T, ? extends Query<T>> queryToolChest, SegmentId segmentId, AtomicLong atomicLong) {
        String segmentId2 = segmentId.toString();
        return CPUTimeMetricQueryRunner.safeBuild(new MetricsEmittingQueryRunner(this.emitter, queryToolChest, new MetricsEmittingQueryRunner(this.emitter, queryToolChest, queryRunner, (v0, v1) -> {
            v0.reportSegmentTime(v1);
        }, queryMetrics -> {
            queryMetrics.segment(segmentId2);
        }), (v0, v1) -> {
            v0.reportSegmentAndCacheTime(v1);
        }, queryMetrics2 -> {
            queryMetrics2.segment(segmentId2);
        }).withWaitMeasuredFromNow(), queryToolChest, this.emitter, atomicLong, false);
    }

    public VersionedIntervalTimeline<String, Sink> getSinkTimeline() {
        return this.sinkTimeline;
    }

    public static String makeHydrantCacheIdentifier(FireHydrant fireHydrant) {
        return fireHydrant.getSegmentId() + "_" + fireHydrant.getCount();
    }
}
