package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.prefetch.CacheManager;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.data.input.impl.prefetch.OpenedObject;
import org.apache.druid.data.input.impl.prefetch.PrefetchConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/segment/realtime/firehose/PrefetchSqlFirehoseFactory.class */
public abstract class PrefetchSqlFirehoseFactory<T> implements FirehoseFactory<InputRowParser<Map<String, Object>>> {
    private static final Logger LOG = new Logger(PrefetchSqlFirehoseFactory.class);
    private final PrefetchConfig prefetchConfig;
    private final CacheManager<T> cacheManager;
    private List<T> objects;
    private ObjectMapper objectMapper;

    public PrefetchSqlFirehoseFactory(Long l, Long l2, Long l3, Long l4, ObjectMapper objectMapper) {
        this.prefetchConfig = new PrefetchConfig(l, l2, l3, l4);
        this.cacheManager = new CacheManager<>(this.prefetchConfig.getMaxCacheCapacityBytes());
        this.objectMapper = objectMapper;
    }

    @JsonProperty
    public long getMaxCacheCapacityBytes() {
        return this.cacheManager.getMaxCacheCapacityBytes();
    }

    @JsonProperty
    public long getMaxFetchCapacityBytes() {
        return this.prefetchConfig.getMaxFetchCapacityBytes();
    }

    @JsonProperty
    public long getPrefetchTriggerBytes() {
        return this.prefetchConfig.getPrefetchTriggerBytes();
    }

    @JsonProperty
    public long getFetchTimeout() {
        return this.prefetchConfig.getFetchTimeout();
    }

    @VisibleForTesting
    CacheManager<T> getCacheManager() {
        return this.cacheManager;
    }

    public Firehose connect(InputRowParser<Map<String, Object>> inputRowParser, @Nullable final File file) {
        if (this.objects == null) {
            this.objects = ImmutableList.copyOf((Collection) Preconditions.checkNotNull(initObjects(), "objects"));
        }
        if (this.cacheManager.isEnabled() || this.prefetchConfig.getMaxFetchCapacityBytes() > 0) {
            Preconditions.checkNotNull(file, "temporaryDirectory");
            Preconditions.checkArgument(file.exists(), "temporaryDirectory[%s] does not exist", new Object[]{file});
            Preconditions.checkArgument(file.isDirectory(), "temporaryDirectory[%s] is not a directory", new Object[]{file});
        }
        LOG.info("Create a new firehose for [%d] queries", new Object[]{Integer.valueOf(this.objects.size())});
        ExecutorService singleThreaded = Execs.singleThreaded("firehose_fetch_%d");
        final SqlFetcher sqlFetcher = new SqlFetcher(this.cacheManager, this.objects, singleThreaded, file, this.prefetchConfig, new ObjectOpenFunction<T>() { // from class: org.apache.druid.segment.realtime.firehose.PrefetchSqlFirehoseFactory.1
            public InputStream open(T t, File file2) throws IOException {
                return PrefetchSqlFirehoseFactory.this.openObjectStream(t, file2);
            }

            public InputStream open(T t) throws IOException {
                return PrefetchSqlFirehoseFactory.this.openObjectStream(t, File.createTempFile("sqlresults_", null, file));
            }
        });
        return new SqlFirehose(new Iterator<JsonIterator<Map<String, Object>>>() { // from class: org.apache.druid.segment.realtime.firehose.PrefetchSqlFirehoseFactory.2
            @Override // java.util.Iterator
            public boolean hasNext() {
                return sqlFetcher.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public JsonIterator<Map<String, Object>> next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                try {
                    TypeReference<Map<String, Object>> typeReference = new TypeReference<Map<String, Object>>() { // from class: org.apache.druid.segment.realtime.firehose.PrefetchSqlFirehoseFactory.2.1
                    };
                    OpenedObject next = sqlFetcher.next();
                    return new JsonIterator<>(typeReference, next.getObjectStream(), next.getResourceCloser(), PrefetchSqlFirehoseFactory.this.objectMapper);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, inputRowParser, () -> {
            singleThreaded.shutdownNow();
            try {
                Preconditions.checkState(singleThreaded.awaitTermination(this.prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ISE("Failed to shutdown fetch executor during close", new Object[0]);
            }
        });
    }

    protected abstract InputStream openObjectStream(T t, File file) throws IOException;

    protected abstract Collection<T> initObjects();
}
