/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.data.input.impl.prefetch;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.FileIteratingFirehose;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.CacheManager;
import org.apache.druid.data.input.impl.prefetch.FileFetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.data.input.impl.prefetch.OpenedObject;
import org.apache.druid.data.input.impl.prefetch.PrefetchConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

public abstract class PrefetchableTextFilesFirehoseFactory<T>
extends AbstractTextFilesFirehoseFactory<T> {
    private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
    private static final CacheManager DISABLED_CACHE_MANAGER = new CacheManager(0L);
    private static final PrefetchConfig DISABLED_PREFETCH_CONFIG = new PrefetchConfig(0L, 0L, 0L, 0L);
    public static final int DEFAULT_MAX_FETCH_RETRY = 3;
    private final CacheManager<T> cacheManager;
    private final PrefetchConfig prefetchConfig;
    private List<T> objects;
    private final int maxFetchRetry;

    public PrefetchableTextFilesFirehoseFactory(Long maxCacheCapacityBytes, Long maxFetchCapacityBytes, Long prefetchTriggerBytes, Long fetchTimeout, Integer maxFetchRetry) {
        this.prefetchConfig = new PrefetchConfig(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout);
        this.cacheManager = new CacheManager(this.prefetchConfig.getMaxCacheCapacityBytes());
        this.maxFetchRetry = maxFetchRetry == null ? 3 : maxFetchRetry;
    }

    @JsonProperty
    public long getMaxCacheCapacityBytes() {
        return this.cacheManager.getMaxCacheCapacityBytes();
    }

    @JsonProperty
    public long getMaxFetchCapacityBytes() {
        return this.prefetchConfig.getMaxFetchCapacityBytes();
    }

    @JsonProperty
    public long getPrefetchTriggerBytes() {
        return this.prefetchConfig.getPrefetchTriggerBytes();
    }

    @JsonProperty
    public long getFetchTimeout() {
        return this.prefetchConfig.getFetchTimeout();
    }

    @JsonProperty
    public int getMaxFetchRetry() {
        return this.maxFetchRetry;
    }

    @VisibleForTesting
    CacheManager<T> getCacheManager() {
        return this.cacheManager;
    }

    @Override
    public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException {
        return this.connectInternal(firehoseParser, temporaryDirectory, this.prefetchConfig, this.cacheManager);
    }

    @Override
    public Firehose connectForSampler(StringInputRowParser parser, @Nullable File temporaryDirectory) throws IOException {
        return this.connectInternal(parser, temporaryDirectory, DISABLED_PREFETCH_CONFIG, DISABLED_CACHE_MANAGER);
    }

    private Firehose connectInternal(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory, PrefetchConfig prefetchConfig, CacheManager cacheManager) throws IOException {
        if (this.objects == null) {
            this.objects = ImmutableList.copyOf((Collection)((Collection)Preconditions.checkNotNull(this.initObjects(), (Object)"objects")));
        }
        if (cacheManager.isEnabled() || prefetchConfig.getMaxFetchCapacityBytes() > 0L) {
            Preconditions.checkNotNull((Object)temporaryDirectory, (Object)"temporaryDirectory");
            Preconditions.checkArgument((boolean)temporaryDirectory.exists(), (String)"temporaryDirectory[%s] does not exist", (Object[])new Object[]{temporaryDirectory});
            Preconditions.checkArgument((boolean)temporaryDirectory.isDirectory(), (String)"temporaryDirectory[%s] is not a directory", (Object[])new Object[]{temporaryDirectory});
        }
        LOG.info("Create a new firehose for [%d] objects", this.objects.size());
        ExecutorService fetchExecutor = Execs.singleThreaded("firehose_fetch_%d");
        final FileFetcher<T> fetcher = new FileFetcher<T>(cacheManager, this.objects, fetchExecutor, temporaryDirectory, prefetchConfig, new ObjectOpenFunction<T>(){

            @Override
            public InputStream open(T object) throws IOException {
                return PrefetchableTextFilesFirehoseFactory.this.openObjectStream(object);
            }

            @Override
            public InputStream open(T object, long start) throws IOException {
                return PrefetchableTextFilesFirehoseFactory.this.openObjectStream(object, start);
            }
        }, this.getRetryCondition(), this.getMaxFetchRetry());
        return new FileIteratingFirehose(new Iterator<LineIterator>(){

            @Override
            public boolean hasNext() {
                return fetcher.hasNext();
            }

            @Override
            public LineIterator next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                Object openedObject = fetcher.next();
                try {
                    return new ResourceCloseableLineIterator(new InputStreamReader(PrefetchableTextFilesFirehoseFactory.this.wrapObjectStream(((OpenedObject)openedObject).getObject(), ((OpenedObject)openedObject).getObjectStream()), StandardCharsets.UTF_8), ((OpenedObject)openedObject).getResourceCloser());
                }
                catch (IOException e) {
                    try {
                        ((OpenedObject)openedObject).getResourceCloser().close();
                    }
                    catch (Throwable t) {
                        e.addSuppressed(t);
                    }
                    throw new RuntimeException(e);
                }
            }
        }, firehoseParser, () -> {
            fetchExecutor.shutdownNow();
            try {
                Preconditions.checkState((boolean)fetchExecutor.awaitTermination(prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ISE("Failed to shutdown fetch executor during close", new Object[0]);
            }
        });
    }

    protected abstract Predicate<Throwable> getRetryCondition();

    protected abstract InputStream openObjectStream(T var1, long var2) throws IOException;

    static class ResourceCloseableLineIterator
    extends LineIterator {
        private final Closeable resourceCloser;

        ResourceCloseableLineIterator(Reader reader, Closeable resourceCloser) throws IllegalArgumentException {
            super(reader);
            this.resourceCloser = resourceCloser;
        }

        public void close() {
            try (Closeable ignore = this.resourceCloser;){
                super.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

