package org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.LineIterator;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.org.apache.druid.data.input.Firehose;
import org.apache.hive.druid.org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.hive.druid.org.apache.druid.data.input.impl.FileIteratingFirehose;
import org.apache.hive.druid.org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.hive.druid.org.apache.druid.java.util.common.ISE;
import org.apache.hive.druid.org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/hive/druid/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.class */
public abstract class PrefetchableTextFilesFirehoseFactory<T> extends AbstractTextFilesFirehoseFactory<T> {
    private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
    private static final CacheManager DISABLED_CACHE_MANAGER = new CacheManager(0);
    private static final FetchConfig DISABLED_PREFETCH_CONFIG = new FetchConfig(0L, 0L, 0L, 0L, 0);
    private final CacheManager<T> cacheManager;
    private final FetchConfig fetchConfig;
    private List<T> objects;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hive/druid/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory$ResourceCloseableLineIterator.class */
    public static class ResourceCloseableLineIterator extends LineIterator {
        private final Closeable resourceCloser;

        ResourceCloseableLineIterator(Reader reader, Closeable closeable) throws IllegalArgumentException {
            super(reader);
            this.resourceCloser = closeable;
        }

        public void close() {
            try {
                Closeable closeable = this.resourceCloser;
                Throwable th = null;
                try {
                    super.close();
                    if (closeable != null) {
                        if (0 != 0) {
                            try {
                                closeable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            closeable.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public PrefetchableTextFilesFirehoseFactory(@Nullable Long l, @Nullable Long l2, @Nullable Long l3, @Nullable Long l4, @Nullable Integer num) {
        this.fetchConfig = new FetchConfig(l, l2, l3, l4, num);
        this.cacheManager = new CacheManager<>(this.fetchConfig.getMaxCacheCapacityBytes());
    }

    @JsonProperty
    public long getMaxCacheCapacityBytes() {
        return this.cacheManager.getMaxCacheCapacityBytes();
    }

    @JsonProperty
    public long getMaxFetchCapacityBytes() {
        return this.fetchConfig.getMaxFetchCapacityBytes();
    }

    @JsonProperty
    public long getPrefetchTriggerBytes() {
        return this.fetchConfig.getPrefetchTriggerBytes();
    }

    @JsonProperty
    public long getFetchTimeout() {
        return this.fetchConfig.getFetchTimeout();
    }

    @JsonProperty
    public int getMaxFetchRetry() {
        return this.fetchConfig.getMaxFetchRetry();
    }

    @VisibleForTesting
    CacheManager<T> getCacheManager() {
        return this.cacheManager;
    }

    @Override // org.apache.hive.druid.org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory, org.apache.hive.druid.org.apache.druid.data.input.FirehoseFactory
    public Firehose connect(StringInputRowParser stringInputRowParser, @Nullable File file) throws IOException {
        return connectInternal(stringInputRowParser, file, this.fetchConfig, this.cacheManager);
    }

    @Override // org.apache.hive.druid.org.apache.druid.data.input.FirehoseFactory
    public Firehose connectForSampler(StringInputRowParser stringInputRowParser, @Nullable File file) throws IOException {
        return connectInternal(stringInputRowParser, file, DISABLED_PREFETCH_CONFIG, DISABLED_CACHE_MANAGER);
    }

    private Firehose connectInternal(StringInputRowParser stringInputRowParser, @Nullable File file, FetchConfig fetchConfig, CacheManager cacheManager) throws IOException {
        if (this.objects == null) {
            this.objects = ImmutableList.copyOf((Collection) Preconditions.checkNotNull(initObjects(), "objects"));
        }
        if (cacheManager.isEnabled() || fetchConfig.getMaxFetchCapacityBytes() > 0) {
            Preconditions.checkNotNull(file, "temporaryDirectory");
            Preconditions.checkArgument(file.exists(), "temporaryDirectory[%s] does not exist", file);
            Preconditions.checkArgument(file.isDirectory(), "temporaryDirectory[%s] is not a directory", file);
        }
        LOG.info("Create a new firehose for [%d] objects", Integer.valueOf(this.objects.size()));
        ExecutorService singleThreaded = Execs.singleThreaded("firehose_fetch_%d");
        final FileFetcher fileFetcher = new FileFetcher(cacheManager, this.objects, singleThreaded, file, fetchConfig, new ObjectOpenFunction<T>() { // from class: org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.1
            @Override // org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction
            public InputStream open(T t) throws IOException {
                return PrefetchableTextFilesFirehoseFactory.this.openObjectStream(t);
            }

            @Override // org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction
            public InputStream open(T t, long j) throws IOException {
                return PrefetchableTextFilesFirehoseFactory.this.openObjectStream(t, j);
            }
        }, getRetryCondition());
        return new FileIteratingFirehose(new Iterator<LineIterator>() { // from class: org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.2
            @Override // java.util.Iterator
            public boolean hasNext() {
                return fileFetcher.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public LineIterator next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                OpenObject<T> next = fileFetcher.next();
                try {
                    return new ResourceCloseableLineIterator(new InputStreamReader(PrefetchableTextFilesFirehoseFactory.this.wrapObjectStream(next.getObject(), next.getObjectStream()), StandardCharsets.UTF_8), next.getResourceCloser());
                } catch (IOException e) {
                    try {
                        next.getResourceCloser().close();
                    } catch (Throwable th) {
                        e.addSuppressed(th);
                    }
                    throw new RuntimeException(e);
                }
            }
        }, stringInputRowParser, () -> {
            singleThreaded.shutdownNow();
            try {
                Preconditions.checkState(singleThreaded.awaitTermination(fetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ISE("Failed to shutdown fetch executor during close", new Object[0]);
            }
        });
    }

    protected abstract Predicate<Throwable> getRetryCondition();

    protected abstract InputStream openObjectStream(T t, long j) throws IOException;
}
