package org.apache.jackrabbit.oak.segment.remote.queue;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.class */
public class SegmentWriteQueue implements Closeable {
    public static final int THREADS = Integer.getInteger("oak.segment.remote.threads", 5).intValue();
    private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.remote.queue.size", 20).intValue();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SegmentWriteQueue.class);
    private final BlockingDeque<SegmentWriteAction> queue;
    private final Map<UUID, SegmentWriteAction> segmentsByUUID;
    private final ExecutorService executor;
    private final ReadWriteLock flushLock;
    private final SegmentConsumer writer;
    private volatile boolean shutdown;
    private final Object brokenMonitor;
    private volatile boolean broken;

    /* loaded from: input_file:org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue$SegmentConsumeException.class */
    public static class SegmentConsumeException extends Exception {
        private final SegmentWriteAction segment;

        public SegmentConsumeException(SegmentWriteAction segmentWriteAction, Exception exc) {
            super(exc);
            this.segment = segmentWriteAction;
        }
    }

    /* loaded from: input_file:org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue$SegmentConsumer.class */
    public interface SegmentConsumer {
        void consume(RemoteSegmentArchiveEntry remoteSegmentArchiveEntry, byte[] bArr, int i, int i2) throws IOException;
    }

    public SegmentWriteQueue(SegmentConsumer segmentConsumer) {
        this(segmentConsumer, QUEUE_SIZE, THREADS);
    }

    SegmentWriteQueue(SegmentConsumer segmentConsumer, int i, int i2) {
        this.brokenMonitor = new Object();
        this.writer = segmentConsumer;
        this.segmentsByUUID = new ConcurrentHashMap();
        this.flushLock = new ReentrantReadWriteLock();
        this.queue = new LinkedBlockingDeque(i);
        this.executor = Executors.newFixedThreadPool(i2 + 1);
        for (int i3 = 0; i3 < i2; i3++) {
            this.executor.submit(this::mainLoop);
        }
        this.executor.submit(this::emergencyLoop);
    }

    private void mainLoop() {
        while (!this.shutdown) {
            try {
                waitWhileBroken();
            } catch (SegmentConsumeException e) {
                SegmentWriteAction segmentWriteAction = e.segment;
                log.error("Can't persist the segment {}", segmentWriteAction.getUuid(), e.getCause());
                try {
                    this.queue.put(segmentWriteAction);
                } catch (InterruptedException e2) {
                    log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segmentWriteAction.getUuid(), e2);
                    synchronized (this.segmentsByUUID) {
                        this.segmentsByUUID.remove(segmentWriteAction.getUuid());
                        this.segmentsByUUID.notifyAll();
                    }
                }
            }
            if (this.shutdown) {
                return;
            } else {
                consume();
            }
        }
    }

    private void consume() throws SegmentConsumeException {
        SegmentWriteAction segmentWriteAction = null;
        try {
            segmentWriteAction = this.queue.poll(100L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Poll from queue interrupted", (Throwable) e);
        }
        if (segmentWriteAction != null) {
            consume(segmentWriteAction);
        }
    }

    private void consume(SegmentWriteAction segmentWriteAction) throws SegmentConsumeException {
        try {
            segmentWriteAction.passTo(this.writer);
            synchronized (this.segmentsByUUID) {
                this.segmentsByUUID.remove(segmentWriteAction.getUuid());
                this.segmentsByUUID.notifyAll();
            }
            setBroken(false);
        } catch (IOException | RuntimeException e) {
            setBroken(true);
            throw new SegmentConsumeException(segmentWriteAction, e);
        }
    }

    private void emergencyLoop() {
        while (!this.shutdown) {
            waitUntilBroken();
            if (this.shutdown) {
                return;
            }
            boolean z = false;
            SegmentWriteAction segmentWriteAction = null;
            do {
                if (segmentWriteAction == null) {
                    try {
                        consume();
                    } catch (SegmentConsumeException e) {
                        segmentWriteAction = e.segment;
                        log.error("Can't persist the segment {}", segmentWriteAction.getUuid(), e.getCause());
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e2) {
                            log.warn("Interrupted", (Throwable) e);
                        }
                        if (this.shutdown) {
                            log.error("Shutdown initiated. The segment {} will be dropped.", segmentWriteAction.getUuid());
                        }
                    }
                } else {
                    consume(segmentWriteAction);
                }
                z = true;
                if (!z) {
                }
            } while (!this.shutdown);
        }
    }

    public void addToQueue(RemoteSegmentArchiveEntry remoteSegmentArchiveEntry, byte[] bArr, int i, int i2) throws IOException {
        waitWhileBroken();
        if (this.shutdown) {
            throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
        }
        SegmentWriteAction segmentWriteAction = new SegmentWriteAction(remoteSegmentArchiveEntry, bArr, i, i2);
        this.flushLock.readLock().lock();
        try {
            try {
                this.segmentsByUUID.put(segmentWriteAction.getUuid(), segmentWriteAction);
                if (this.queue.offer(segmentWriteAction, 1L, TimeUnit.MINUTES)) {
                    return;
                }
                this.segmentsByUUID.remove(segmentWriteAction.getUuid());
                throw new IOException("Can't add segment to the queue");
            } catch (InterruptedException e) {
                this.segmentsByUUID.remove(segmentWriteAction.getUuid());
                throw new IOException(e);
            }
        } finally {
            this.flushLock.readLock().unlock();
        }
    }

    public void flush() throws IOException {
        this.flushLock.writeLock().lock();
        try {
            try {
                synchronized (this.segmentsByUUID) {
                    long currentTimeMillis = System.currentTimeMillis();
                    while (!this.segmentsByUUID.isEmpty()) {
                        this.segmentsByUUID.wait(100L);
                        if (System.currentTimeMillis() - currentTimeMillis > TimeUnit.MINUTES.toMillis(1L)) {
                            log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", this.queue, this.segmentsByUUID);
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        } finally {
            this.flushLock.writeLock().unlock();
        }
    }

    public SegmentWriteAction read(UUID uuid) {
        return this.segmentsByUUID.get(uuid);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.shutdown = true;
        try {
            this.executor.shutdown();
            if (this.executor.awaitTermination(1L, TimeUnit.MINUTES)) {
            } else {
                throw new IOException("The write wasn't able to shut down clearly");
            }
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public boolean isEmpty() {
        return this.segmentsByUUID.isEmpty();
    }

    boolean isBroken() {
        return this.broken;
    }

    int getSize() {
        return this.queue.size();
    }

    private void setBroken(boolean z) {
        synchronized (this.brokenMonitor) {
            this.broken = z;
            this.brokenMonitor.notifyAll();
        }
    }

    private void waitWhileBroken() {
        if (this.broken) {
            synchronized (this.brokenMonitor) {
                while (this.broken && !this.shutdown) {
                    try {
                        this.brokenMonitor.wait(100L);
                    } catch (InterruptedException e) {
                        log.warn("Interrupted", (Throwable) e);
                    }
                }
            }
        }
    }

    private void waitUntilBroken() {
        if (this.broken) {
            return;
        }
        synchronized (this.brokenMonitor) {
            while (!this.broken && !this.shutdown) {
                try {
                    this.brokenMonitor.wait(100L);
                } catch (InterruptedException e) {
                    log.warn("Interrupted", (Throwable) e);
                }
            }
        }
    }
}
