package org.apache.druid.frame.channel;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;

/* loaded from: input_file:org/apache/druid/frame/channel/BlockingQueueFrameChannel.class */
public class BlockingQueueFrameChannel {
    private static final Optional<Either<Throwable, FrameWithPartition>> END_MARKER = Optional.empty();
    private final int maxQueuedFrames;
    private final Writable writable;
    private final Readable readable;

    @GuardedBy("lock")
    private final ArrayDeque<Optional<Either<Throwable, FrameWithPartition>>> queue;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private SettableFuture<?> readyForWritingFuture = null;

    @GuardedBy("lock")
    private SettableFuture<?> readyForReadingFuture = null;

    /* loaded from: input_file:org/apache/druid/frame/channel/BlockingQueueFrameChannel$Readable.class */
    private class Readable implements ReadableFrameChannel {
        private Readable() {
        }

        @Override // org.apache.druid.frame.channel.ReadableFrameChannel
        public boolean isFinished() {
            return BlockingQueueFrameChannel.this.isFinished();
        }

        @Override // org.apache.druid.frame.channel.ReadableFrameChannel
        public boolean canRead() {
            boolean z;
            synchronized (BlockingQueueFrameChannel.this.lock) {
                z = (BlockingQueueFrameChannel.this.queue.isEmpty() || isFinished()) ? false : true;
            }
            return z;
        }

        @Override // org.apache.druid.frame.channel.ReadableFrameChannel
        public Frame read() {
            Optional optional;
            synchronized (BlockingQueueFrameChannel.this.lock) {
                if (isFinished()) {
                    throw new NoSuchElementException();
                }
                optional = (Optional) BlockingQueueFrameChannel.this.queue.poll();
                if (optional == null || !optional.isPresent()) {
                    throw new NoSuchElementException();
                }
                BlockingQueueFrameChannel.this.notifyWriter();
            }
            return ((FrameWithPartition) ((Either) optional.get()).valueOrThrow()).frame();
        }

        @Override // org.apache.druid.frame.channel.ReadableFrameChannel
        public ListenableFuture<?> readabilityFuture() {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                if (!BlockingQueueFrameChannel.this.queue.isEmpty()) {
                    return Futures.immediateFuture(null);
                }
                if (BlockingQueueFrameChannel.this.readyForReadingFuture != null) {
                    return BlockingQueueFrameChannel.this.readyForReadingFuture;
                }
                return BlockingQueueFrameChannel.this.readyForReadingFuture = SettableFuture.create();
            }
        }

        @Override // org.apache.druid.frame.channel.ReadableFrameChannel, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                BlockingQueueFrameChannel.this.queue.clear();
                BlockingQueueFrameChannel.this.notifyWriter();
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/frame/channel/BlockingQueueFrameChannel$Writable.class */
    private class Writable implements WritableFrameChannel {
        private Writable() {
        }

        @Override // org.apache.druid.frame.channel.WritableFrameChannel
        public void write(FrameWithPartition frameWithPartition) {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                if (BlockingQueueFrameChannel.this.isFinished()) {
                    throw new ISE("Channel cannot accept new frames", new Object[0]);
                }
                if (BlockingQueueFrameChannel.this.queue.size() >= BlockingQueueFrameChannel.this.maxQueuedFrames) {
                    throw new ISE("Channel has no capacity", new Object[0]);
                }
                if (!BlockingQueueFrameChannel.this.queue.offer(Optional.of(Either.value(frameWithPartition)))) {
                    throw new ISE("Channel had capacity, but could not add frame", new Object[0]);
                }
                BlockingQueueFrameChannel.this.notifyReader();
            }
        }

        @Override // org.apache.druid.frame.channel.WritableFrameChannel
        public ListenableFuture<?> writabilityFuture() {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                if (BlockingQueueFrameChannel.this.queue.size() < BlockingQueueFrameChannel.this.maxQueuedFrames) {
                    return Futures.immediateFuture(null);
                }
                if (BlockingQueueFrameChannel.this.readyForWritingFuture != null) {
                    return BlockingQueueFrameChannel.this.readyForWritingFuture;
                }
                return BlockingQueueFrameChannel.this.readyForWritingFuture = SettableFuture.create();
            }
        }

        @Override // org.apache.druid.frame.channel.WritableFrameChannel
        public void fail(@Nullable Throwable th) {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                BlockingQueueFrameChannel.this.queue.clear();
                if (!BlockingQueueFrameChannel.this.queue.offer(Optional.of(Either.error(th != null ? th : new ISE("Failed", new Object[0]))))) {
                    throw new ISE("Could not write error to channel", new Object[0]);
                }
                close();
            }
        }

        @Override // org.apache.druid.frame.channel.WritableFrameChannel, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            synchronized (BlockingQueueFrameChannel.this.lock) {
                if (BlockingQueueFrameChannel.this.isFinished()) {
                    throw new ISE("Already done", new Object[0]);
                }
                if (!BlockingQueueFrameChannel.this.queue.offer(BlockingQueueFrameChannel.END_MARKER)) {
                    throw new ISE("Channel had capacity, but could not add end marker", new Object[0]);
                }
                BlockingQueueFrameChannel.this.notifyReader();
            }
        }
    }

    public BlockingQueueFrameChannel(int i) {
        if (i < 1 || i == Integer.MAX_VALUE) {
            throw new IAE("Cannot handle capacity of [%d]", Integer.valueOf(i));
        }
        this.maxQueuedFrames = i;
        this.queue = new ArrayDeque<>(i + 1);
        this.writable = new Writable();
        this.readable = new Readable();
    }

    public WritableFrameChannel writable() {
        return this.writable;
    }

    public ReadableFrameChannel readable() {
        return this.readable;
    }

    public static BlockingQueueFrameChannel minimal() {
        return new BlockingQueueFrameChannel(1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isFinished() {
        boolean equals;
        synchronized (this.lock) {
            equals = END_MARKER.equals(this.queue.peek());
        }
        return equals;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @GuardedBy("lock")
    public void notifyWriter() {
        if (this.readyForWritingFuture != null) {
            SettableFuture<?> settableFuture = this.readyForWritingFuture;
            this.readyForWritingFuture = null;
            settableFuture.set(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @GuardedBy("lock")
    public void notifyReader() {
        if (this.readyForReadingFuture != null) {
            SettableFuture<?> settableFuture = this.readyForReadingFuture;
            this.readyForReadingFuture = null;
            settableFuture.set(null);
        }
    }
}
