package co.cask.cdap.logging.read;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.filter.AndFilter;
import co.cask.cdap.logging.filter.Filter;
import co.cask.cdap.logging.save.LogSaverTableUtil;
import co.cask.cdap.logging.serialize.LogSchema;
import co.cask.cdap.logging.write.FileMetaDataManager;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/read/StandaloneLogReader.class */
public class StandaloneLogReader implements LogReader {
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneLogReader.class);
    private static final int MAX_THREAD_POOL_SIZE = 20;
    private final FileMetaDataManager fileMetaDataManager;
    private final Schema schema;
    private final ExecutorService executor;

    /* loaded from: input_file:co/cask/cdap/logging/read/StandaloneLogReader$CountingCallback.class */
    private static class CountingCallback implements Callback {
        private final Callback callback;
        private final AtomicInteger count;

        private CountingCallback(Callback callback) {
            this.count = new AtomicInteger(0);
            this.callback = callback;
        }

        @Override // co.cask.cdap.logging.read.Callback
        public void init() {
        }

        @Override // co.cask.cdap.logging.read.Callback
        public void handle(LogEvent logEvent) {
            this.count.incrementAndGet();
            this.callback.handle(logEvent);
        }

        public int getCount() {
            return this.count.get();
        }

        @Override // co.cask.cdap.logging.read.Callback
        public void close() {
        }
    }

    @Inject
    public StandaloneLogReader(CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, LocalLocationFactory localLocationFactory) {
        Preconditions.checkNotNull(cConfiguration.get(LoggingConfiguration.LOG_BASE_DIR), "Log base dir cannot be null");
        try {
            this.schema = new LogSchema().getAvroSchema();
            this.fileMetaDataManager = new FileMetaDataManager(new LogSaverTableUtil(datasetFramework, cConfiguration), transactionSystemClient, localLocationFactory);
            this.executor = new ThreadPoolExecutor(0, MAX_THREAD_POOL_SIZE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.createDaemonThreadFactory("single-log-reader-%d"), new ThreadPoolExecutor.DiscardPolicy());
        } catch (Exception e) {
            LOG.error("Got exception", e);
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogNext(final LoggingContext loggingContext, final long j, final int i, final Filter filter, final Callback callback) {
        if (j < 0) {
            getLogPrev(loggingContext, -1L, i, filter, callback);
        } else {
            this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.StandaloneLogReader.1
                @Override // java.lang.Runnable
                public void run() {
                    callback.init();
                    try {
                        try {
                            AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                            long j2 = j + 1;
                            SortedMap<Long, Location> listFiles = StandaloneLogReader.this.fileMetaDataManager.listFiles(loggingContext);
                            if (listFiles.isEmpty()) {
                                return;
                            }
                            long j3 = -1;
                            Location location = null;
                            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(listFiles.size());
                            for (Map.Entry<Long, Location> entry : listFiles.entrySet()) {
                                if (entry.getKey().longValue() >= j2 && location != null) {
                                    newArrayListWithExpectedSize.add(location);
                                }
                                j3 = entry.getKey().longValue();
                                location = entry.getValue();
                            }
                            if (j3 != -1) {
                                newArrayListWithExpectedSize.add(location);
                            }
                            AvroFileLogReader avroFileLogReader = new AvroFileLogReader(StandaloneLogReader.this.schema);
                            CountingCallback countingCallback = new CountingCallback(callback);
                            Iterator it = newArrayListWithExpectedSize.iterator();
                            while (it.hasNext()) {
                                avroFileLogReader.readLog((Location) it.next(), andFilter, j2, Long.MAX_VALUE, i - countingCallback.getCount(), countingCallback);
                                if (countingCallback.getCount() >= i) {
                                    break;
                                }
                            }
                            callback.close();
                        } catch (Throwable th) {
                            StandaloneLogReader.LOG.error("Got exception: ", th);
                            throw Throwables.propagate(th);
                        }
                    } finally {
                        callback.close();
                    }
                }
            });
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogPrev(final LoggingContext loggingContext, final long j, final int i, final Filter filter, final Callback callback) {
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.StandaloneLogReader.2
            @Override // java.lang.Runnable
            public void run() {
                RuntimeException propagate;
                callback.init();
                try {
                    try {
                        AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                        ImmutableSortedMap copyOf = ImmutableSortedMap.copyOf(StandaloneLogReader.this.fileMetaDataManager.listFiles(loggingContext), Collections.reverseOrder());
                        if (copyOf.isEmpty()) {
                            return;
                        }
                        long currentTimeMillis = j >= 0 ? j - 1 : System.currentTimeMillis();
                        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(copyOf.size());
                        for (Map.Entry entry : copyOf.entrySet()) {
                            if (((Long) entry.getKey()).longValue() <= currentTimeMillis) {
                                newArrayListWithExpectedSize.add(entry.getValue());
                            }
                        }
                        LinkedList newLinkedList = Lists.newLinkedList();
                        AvroFileLogReader avroFileLogReader = new AvroFileLogReader(StandaloneLogReader.this.schema);
                        int i2 = 0;
                        Iterator it = newArrayListWithExpectedSize.iterator();
                        while (it.hasNext()) {
                            Collection<LogEvent> readLogPrev = avroFileLogReader.readLogPrev((Location) it.next(), andFilter, currentTimeMillis, i - i2);
                            newLinkedList.add(readLogPrev);
                            i2 += readLogPrev.size();
                            if (i2 >= i) {
                                break;
                            }
                        }
                        Iterator it2 = Iterables.concat(Lists.reverse(newLinkedList)).iterator();
                        while (it2.hasNext()) {
                            callback.handle((LogEvent) it2.next());
                        }
                        callback.close();
                    } finally {
                    }
                } finally {
                    callback.close();
                }
            }
        });
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLog(final LoggingContext loggingContext, final long j, final long j2, final Filter filter, final Callback callback) {
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.StandaloneLogReader.3
            @Override // java.lang.Runnable
            public void run() {
                callback.init();
                try {
                    try {
                        AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                        SortedMap<Long, Location> listFiles = StandaloneLogReader.this.fileMetaDataManager.listFiles(loggingContext);
                        if (listFiles.isEmpty()) {
                            return;
                        }
                        long j3 = -1;
                        Location location = null;
                        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(listFiles.size());
                        for (Map.Entry<Long, Location> entry : listFiles.entrySet()) {
                            if (entry.getKey().longValue() >= j && j3 != -1 && j3 < j2) {
                                newArrayListWithExpectedSize.add(location);
                            }
                            j3 = entry.getKey().longValue();
                            location = entry.getValue();
                        }
                        if (j3 != -1 && j3 < j2) {
                            newArrayListWithExpectedSize.add(location);
                        }
                        AvroFileLogReader avroFileLogReader = new AvroFileLogReader(StandaloneLogReader.this.schema);
                        Iterator it = newArrayListWithExpectedSize.iterator();
                        while (it.hasNext()) {
                            avroFileLogReader.readLog((Location) it.next(), andFilter, j, j2, Integer.MAX_VALUE, callback);
                        }
                        callback.close();
                    } catch (Throwable th) {
                        StandaloneLogReader.LOG.error("Got exception: ", th);
                        throw Throwables.propagate(th);
                    }
                } finally {
                    callback.close();
                }
            }
        });
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void close() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }
}
