package com.addthis.basis.collect.dbq;

import com.addthis.basis.jvm.Shutdown;
import com.addthis.basis.util.LessFiles;
import com.addthis.basis.util.LessPaths;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/addthis/basis/collect/dbq/DiskBackedQueueInternals.class */
public class DiskBackedQueueInternals<E> implements Closeable {
    private static final Logger log;
    final int pageSize;
    final int minReadPages;
    final int minWritePages;
    final int maxPages;
    final long maxDiskBytes;
    final Serializer<E> serializer;
    final Duration terminationWait;

    @Nonnull
    final GZIPOptions gzipOptions;
    final boolean silent;
    final boolean memoryDouble;

    @GuardedBy("external")
    final Path external;

    @Nullable
    private final ConcurrentSkipListMap<Long, Page<E>> diskQueue;

    @Nullable
    final AtomicInteger diskQueueSize;

    @GuardedBy("lock")
    private Page<E> writePage;

    @GuardedBy("lock")
    private Page<E> readPage;
    private final ScheduledThreadPoolExecutor executor;
    private static final ScheduledThreadPoolExecutor sharedExecutor;
    private final List<ScheduledFuture<?>> backgroundTasks;
    private static final Path SIZEFILE;
    private static final Predicate<Path> EXCLUDE_SIZEFILE;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ReentrantLock lock = new ReentrantLock();

    @GuardedBy("lock")
    private final Condition notEmpty = this.lock.newCondition();

    @GuardedBy("lock")
    private final Condition notFull = this.lock.newCondition();
    final AtomicLong diskByteUsage = new AtomicLong();
    final AtomicLong queueSize = new AtomicLong();

    @GuardedBy("lock")
    private final NavigableMap<Long, Page<E>> readQueue = new TreeMap();
    private final AtomicReference<IOException> error = new AtomicReference<>(null);
    final AtomicInteger backgroundActiveTasks = new AtomicInteger();
    private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicLong fastWrite = new AtomicLong();
    private final AtomicLong slowWrite = new AtomicLong();
    private final Semaphore loadPageSemaphore = new Semaphore(1);

    /* loaded from: input_file:com/addthis/basis/collect/dbq/DiskBackedQueueInternals$DiskWriteTask.class */
    private class DiskWriteTask implements Runnable {
        private DiskWriteTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Map.Entry pollFirstEntry;
            DiskBackedQueueInternals.this.backgroundActiveTasks.getAndIncrement();
            while (DiskBackedQueueInternals.this.getError() == null && DiskBackedQueueInternals.this.diskQueueSize.get() > DiskBackedQueueInternals.this.minWritePages && (pollFirstEntry = DiskBackedQueueInternals.this.diskQueue.pollFirstEntry()) != null) {
                try {
                    DiskBackedQueueInternals.this.diskQueueSize.decrementAndGet();
                    ((Page) pollFirstEntry.getValue()).writeToFile();
                    DiskBackedQueueInternals.this.signalConsumers(((Long) pollFirstEntry.getKey()).longValue());
                    if (Thread.interrupted()) {
                        break;
                    }
                } catch (IOException e) {
                    DiskBackedQueueInternals.this.setError(e);
                    return;
                } finally {
                    DiskBackedQueueInternals.this.backgroundActiveTasks.getAndDecrement();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/addthis/basis/collect/dbq/DiskBackedQueueInternals$GZIPOptions.class */
    public static class GZIPOptions {
        final boolean compress;
        final int compressionBuffer;
        final int compressionLevel;

        public GZIPOptions(boolean z, int i, int i2) {
            this.compress = z;
            this.compressionBuffer = i;
            this.compressionLevel = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiskBackedQueueInternals(int i, int i2, int i3, long j, int i4, Path path, Serializer<E> serializer, Duration duration, boolean z, boolean z2, boolean z3, int i5, int i6, boolean z4, boolean z5) throws IOException {
        this.pageSize = i;
        this.maxPages = i3;
        this.maxDiskBytes = j;
        this.external = path;
        this.serializer = serializer;
        this.terminationWait = duration;
        this.silent = z2;
        this.gzipOptions = new GZIPOptions(z3, i6, i5);
        this.memoryDouble = z4;
        if (i3 <= 1) {
            this.diskQueue = null;
            this.diskQueueSize = null;
            this.executor = null;
            this.backgroundTasks = null;
            this.minReadPages = i2;
            this.minWritePages = 0;
        } else {
            this.diskQueue = new ConcurrentSkipListMap<>();
            this.diskQueueSize = new AtomicInteger();
            if (z5) {
                this.executor = null;
                sharedExecutor.setCorePoolSize(Math.max(i4, sharedExecutor.getCorePoolSize()));
            } else {
                this.executor = new ScheduledThreadPoolExecutor(i4, new ThreadFactoryBuilder().setNameFormat("disk-backed-queue-writer-%d").build());
            }
            this.minReadPages = Math.max(i2 / 2, 1);
            this.minWritePages = i2 / 2;
            this.backgroundTasks = new ArrayList(i4);
            for (int i7 = 0; i7 < i4; i7++) {
                this.backgroundTasks.add((z5 ? sharedExecutor : this.executor).scheduleWithFixedDelay(new DiskWriteTask(), 0L, 10L, TimeUnit.MILLISECONDS));
            }
        }
        Files.createDirectories(this.external, new FileAttribute[0]);
        Optional<Path> min = Files.list(this.external).filter(EXCLUDE_SIZEFILE).min((path2, path3) -> {
            return path2.getFileName().toString().compareTo(path3.getFileName().toString());
        });
        Optional<Path> max = Files.list(this.external).filter(EXCLUDE_SIZEFILE).max((path4, path5) -> {
            return path5.getFileName().toString().compareTo(path4.getFileName().toString());
        });
        if (min.isPresent() && max.isPresent()) {
            long parseLong = Long.parseLong(min.get().getFileName().toString());
            long parseLong2 = Long.parseLong(max.get().getFileName().toString()) + 1;
            NavigableMap<Long, Page<E>> readPagesFromExternal = readPagesFromExternal(parseLong, this.minReadPages);
            this.readPage = (Page) readPagesFromExternal.remove(Long.valueOf(parseLong));
            this.readQueue.putAll(readPagesFromExternal);
            this.writePage = new Page<>(this, parseLong2);
            this.diskByteUsage.set(LessFiles.directorySize(this.external.toFile()));
            this.queueSize.set(Long.parseLong(new String(Files.readAllBytes(this.external.resolve("size")))));
        } else {
            this.writePage = new Page<>(this, 0L);
            this.readPage = this.writePage;
        }
        if (z) {
            Shutdown.tryAddShutdownHook(new Thread(this::close, "disk-backed-queue-shutdown"));
        }
    }

    private boolean flushQueue(NavigableMap<Long, Page<E>> navigableMap, long j, boolean z) throws IOException {
        if (j > 0 && System.currentTimeMillis() >= j) {
            return false;
        }
        while (true) {
            Map.Entry<Long, Page<E>> pollFirstEntry = navigableMap.pollFirstEntry();
            if (pollFirstEntry == null) {
                return true;
            }
            if (z) {
                this.diskQueueSize.decrementAndGet();
            }
            pollFirstEntry.getValue().writeToFile();
            signalConsumers(pollFirstEntry.getKey().longValue());
            if (j > 0 && System.currentTimeMillis() > j) {
                return false;
            }
        }
    }

    private NavigableMap<Long, Page<E>> readPagesFromExternal(long j, int i) throws IOException {
        TreeMap treeMap = new TreeMap();
        long j2 = j;
        while (true) {
            long j3 = j2;
            if (j3 >= j + i) {
                return treeMap;
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            synchronized (this.external) {
                Path resolve = this.external.resolve(Long.toString(j3));
                if (!Files.exists(resolve, new LinkOption[0])) {
                    return treeMap;
                }
                InputStream newInputStream = Files.newInputStream(resolve, new OpenOption[0]);
                if (this.gzipOptions.compress) {
                    newInputStream = new GZIPInputStream(newInputStream);
                }
                try {
                    ByteStreams.copy(newInputStream, byteArrayOutputStream);
                    newInputStream.close();
                    this.diskByteUsage.addAndGet(-Files.size(resolve));
                    Files.delete(resolve);
                } catch (Throwable th) {
                    newInputStream.close();
                    throw th;
                }
            }
            treeMap.put(Long.valueOf(j3), new Page(this, j3, new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
            j2 = j3 + 1;
        }
    }

    private boolean loadPageFromFile(long j) throws IOException {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        this.lock.unlock();
        this.loadPageSemaphore.acquireUninterruptibly();
        try {
            NavigableMap<Long, Page<E>> readPagesFromExternal = readPagesFromExternal(j, this.minReadPages);
            this.loadPageSemaphore.release();
            this.lock.lock();
            if (readPagesFromExternal != null) {
                this.readQueue.putAll(readPagesFromExternal);
            }
            if (!this.readPage.empty()) {
                return true;
            }
            long j2 = this.readPage.id + 1;
            if (fetchFromQueues(j2)) {
                return true;
            }
            if (j2 != this.writePage.id || this.readPage == this.writePage) {
                return false;
            }
            this.readPage = this.writePage;
            this.notFull.signalAll();
            return !this.readPage.empty();
        } catch (Throwable th) {
            this.loadPageSemaphore.release();
            this.lock.lock();
            throw th;
        }
    }

    private boolean fetchFromQueues(long j) {
        if (fetchFromQueue(this.readQueue, j, false)) {
            return true;
        }
        if (this.diskQueue == null) {
            return false;
        }
        return fetchFromQueue(this.diskQueue, j, true);
    }

    private boolean fetchFromQueue(NavigableMap<Long, Page<E>> navigableMap, long j, boolean z) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.readPage.empty()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && j != this.readPage.id + 1) {
            throw new AssertionError();
        }
        Page<E> page = (Page) navigableMap.remove(Long.valueOf(j));
        if (page == null) {
            return false;
        }
        if (z) {
            this.diskQueueSize.decrementAndGet();
        }
        this.readPage = page;
        this.notEmpty.signalAll();
        this.notFull.signalAll();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int drainTo(Collection<? super E> collection, int i) throws IOException {
        if (this.closed.get()) {
            throw new IllegalStateException("attempted read after close()");
        }
        if (collection == null) {
            throw new NullPointerException();
        }
        if (collection == this) {
            throw new IllegalArgumentException();
        }
        if (i <= 0) {
            return 0;
        }
        int i2 = 0;
        propagateError();
        this.lock.lock();
        while (!this.closed.get()) {
            try {
                long j = this.readPage.id + 1;
                if (!this.readPage.empty()) {
                    i2 = this.readPage.drainTo(collection, i2, i);
                } else if (fetchFromQueues(j)) {
                    i2 = this.readPage.drainTo(collection, i2, i);
                } else if (j == this.writePage.id && this.readPage != this.writePage) {
                    this.readPage = this.writePage;
                    this.notFull.signalAll();
                } else {
                    if (j >= this.writePage.id || !loadPageFromFile(j)) {
                        return i2;
                    }
                    i2 = this.readPage.drainTo(collection, i2, i);
                }
                if (i2 == i) {
                    int i3 = i2;
                    this.lock.unlock();
                    return i3;
                }
            } finally {
                this.lock.unlock();
            }
        }
        throw new IllegalStateException("read did not complete before close()");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public E get(boolean z, long j, TimeUnit timeUnit) throws InterruptedException, IOException {
        if (this.closed.get()) {
            throw new IllegalStateException("attempted read after close()");
        }
        propagateError();
        long nanos = timeUnit == null ? 0L : timeUnit.toNanos(j);
        this.lock.lock();
        while (!this.closed.get()) {
            try {
                long j2 = this.readPage.id + 1;
                if (!this.readPage.empty()) {
                    E e = this.readPage.get(z);
                    this.lock.unlock();
                    return e;
                }
                if (fetchFromQueues(j2)) {
                    E e2 = this.readPage.get(z);
                    this.lock.unlock();
                    return e2;
                }
                if (j2 == this.writePage.id && this.readPage != this.writePage) {
                    this.readPage = this.writePage;
                    this.notFull.signalAll();
                } else {
                    if (j2 < this.writePage.id && loadPageFromFile(j2)) {
                        E e3 = this.readPage.get(z);
                        this.lock.unlock();
                        return e3;
                    }
                    if (timeUnit == null) {
                        this.notEmpty.await();
                    } else {
                        if (nanos <= 0) {
                            return null;
                        }
                        nanos = this.notEmpty.awaitNanos(nanos);
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }
        throw new IllegalStateException("read did not complete before close()");
    }

    private void testNotEmpty() {
        if ((this.readPage.id + 1 == this.writePage.id && this.readPage.empty()) || (this.readPage == this.writePage && this.writePage.count == 1)) {
            this.notEmpty.signal();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean offer(E e, byte[] bArr, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        if (this.closed.get()) {
            throw new IllegalStateException("attempted write after close()");
        }
        Preconditions.checkNotNull(e);
        propagateError();
        long nanos = timeUnit == null ? 0L : timeUnit.toNanos(j);
        this.lock.lock();
        while (!this.closed.get()) {
            try {
                if (!this.writePage.full()) {
                    this.writePage.add(e, this.memoryDouble ? bArr : null);
                    testNotEmpty();
                    this.fastWrite.getAndIncrement();
                    this.lock.unlock();
                    return true;
                }
                if (this.maxDiskBytes <= 0 || this.diskByteUsage.get() <= this.maxDiskBytes) {
                    Page<E> page = this.writePage;
                    this.writePage = new Page<>(this, page.id + 1);
                    this.writePage.add(e, this.memoryDouble ? bArr : null);
                    if (this.readPage == page) {
                        this.fastWrite.getAndIncrement();
                    } else if (this.diskQueue == null || this.diskQueueSize.get() > this.maxPages) {
                        foregroundWrite(page);
                        this.slowWrite.getAndIncrement();
                    } else {
                        Page<E> put = this.diskQueue.put(Long.valueOf(page.id), page);
                        if (!$assertionsDisabled && put != null) {
                            throw new AssertionError();
                        }
                        this.diskQueueSize.incrementAndGet();
                        this.fastWrite.getAndIncrement();
                    }
                    testNotEmpty();
                    this.lock.unlock();
                    return true;
                }
                if (timeUnit == null) {
                    this.notFull.await();
                } else {
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = this.notFull.awaitNanos(nanos);
                }
            } finally {
                this.lock.unlock();
            }
        }
        throw new IllegalStateException("write did not complete before close()");
    }

    private void foregroundWrite(Page<E> page) throws IOException {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        this.lock.unlock();
        try {
            page.writeToFile();
        } finally {
            this.lock.lock();
            signalConsumers(page.id);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public IOException getError() {
        return this.error.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setError(IOException iOException) {
        this.error.compareAndSet(null, iOException);
    }

    private void propagateError() throws IOException {
        IOException iOException = this.error.get();
        if (iOException != null) {
            throw iOException;
        }
    }

    public void clear() throws IOException {
        this.lock.lock();
        try {
            this.writePage = new Page<>(this, 0L);
            this.readPage = this.writePage;
            this.queueSize.set(0L);
            if (this.diskQueue != null) {
                this.diskQueue.clear();
            }
            if (this.diskQueueSize != null) {
                this.diskQueueSize.set(0);
            }
            synchronized (this.external) {
                LessPaths.recursiveDelete(this.external);
                Files.createDirectories(this.external, new FileAttribute[0]);
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            try {
                this.closeFuture.get();
                return;
            } catch (InterruptedException | ExecutionException e) {
                throw Throwables.propagate(e);
            }
        }
        long currentTimeMillis = System.currentTimeMillis() + this.terminationWait.toMillis();
        try {
            if (this.backgroundTasks != null) {
                Iterator<ScheduledFuture<?>> it = this.backgroundTasks.iterator();
                while (it.hasNext()) {
                    it.next().cancel(true);
                }
            }
            if (this.executor != null) {
                this.executor.shutdown();
                try {
                    try {
                        if (!this.silent) {
                            log.info("Waiting on background threads to write approximately {} pages", Integer.valueOf(this.diskQueueSize.get()));
                        }
                        this.executor.awaitTermination(this.terminationWait.toMillis(), TimeUnit.MILLISECONDS);
                        this.executor.shutdownNow();
                    } catch (Throwable th) {
                        this.executor.shutdownNow();
                        throw th;
                    }
                } catch (InterruptedException e2) {
                    this.closeFuture.completeExceptionally(e2);
                    Throwables.propagate(e2);
                    this.executor.shutdownNow();
                }
            }
            int calculateDirtyPageCount = calculateDirtyPageCount();
            if (!this.silent) {
                log.info("Foreground thread must write approximately {} pages", Integer.valueOf(calculateDirtyPageCount));
            }
            if ((System.currentTimeMillis() < currentTimeMillis) && !this.readPage.empty()) {
                this.lock.lock();
                try {
                    foregroundWrite(this.readPage);
                    this.readPage.clear();
                    this.lock.unlock();
                } finally {
                }
            }
            boolean z = System.currentTimeMillis() < currentTimeMillis;
            if (z) {
                z = flushQueue(this.readQueue, currentTimeMillis, false);
            }
            if (z && this.diskQueue != null) {
                z = flushQueue(this.diskQueue, currentTimeMillis, true);
            }
            if (z && !this.writePage.empty()) {
                this.lock.lock();
                try {
                    foregroundWrite(this.writePage);
                    this.writePage.clear();
                    this.lock.unlock();
                } finally {
                }
            }
            long j = this.queueSize.get();
            int calculateDirtyPageCount2 = calculateDirtyPageCount();
            if (calculateDirtyPageCount2 > 0) {
                log.warn("Closing of disk-backed queue timed out before writing all pages to disk. Approximately {} pages were not written to disk.", Integer.valueOf(calculateDirtyPageCount2));
                j -= calculateDirtyPageCount2 * this.pageSize;
            }
            Files.write(this.external.resolve("size"), Long.toString(j).getBytes(), new OpenOption[0]);
            IOException iOException = this.error.get();
            if (iOException != null) {
                this.closeFuture.completeExceptionally(iOException);
                Throwables.propagate(iOException);
            } else {
                this.closeFuture.complete(null);
            }
        } catch (IOException e3) {
            this.closeFuture.completeExceptionally(e3);
            Throwables.propagate(e3);
        }
    }

    public long getDiskByteUsage() {
        return this.diskByteUsage.get();
    }

    public long size() {
        return this.queueSize.get();
    }

    public Path getPath() {
        return this.external;
    }

    public long getFastWrite() {
        return this.fastWrite.get();
    }

    public long getSlowWrite() {
        return this.slowWrite.get();
    }

    private int calculateDirtyPageCount() {
        int size = (this.readPage.empty() ? 0 : 1) + this.readQueue.size();
        if (this.diskQueueSize != null) {
            size += this.diskQueueSize.get();
        }
        if (this.readPage != this.writePage && !this.writePage.empty()) {
            size++;
        }
        return size;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void signalConsumers(long j) {
        this.lock.lock();
        try {
            if (this.readPage.empty() && this.readPage.id + 1 == j) {
                this.notEmpty.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !DiskBackedQueueInternals.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DiskBackedQueue.class);
        sharedExecutor = new ScheduledThreadPoolExecutor(0, new ThreadFactoryBuilder().setNameFormat("disk-backed-queue-shared-writer-%d").build());
        SIZEFILE = Paths.get("size", new String[0]);
        EXCLUDE_SIZEFILE = path -> {
            return !path.getFileName().equals(SIZEFILE);
        };
    }
}
