package org.apache.druid.frame.channel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidMetrics;

/* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.class */
public class ReadableByteChunksFrameChannel implements ReadableFrameChannel {
    private static final Logger log;
    private static final long MAX_FRAME_SIZE_BYTES = 100000000;
    private static final int UNKNOWN_LENGTH = -1;
    private static final int FRAME_MARKER_BYTES = 1;
    private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES = 26;
    private final String id;
    private final long bytesLimit;

    @GuardedBy("lock")
    private StreamPart streamPart;
    private final boolean framesOnly;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final List<Either<Throwable, byte[]>> chunks = new ArrayList();

    @GuardedBy("lock")
    private SettableFuture<?> addChunkBackpressureFuture = null;

    @GuardedBy("lock")
    private SettableFuture<?> readyForReadingFuture = null;

    @GuardedBy("lock")
    private boolean noMoreWrites = false;

    @GuardedBy("lock")
    private int positionInFirstChunk = 0;

    @GuardedBy("lock")
    private long bytesBuffered = 0;

    @GuardedBy("lock")
    private long bytesAdded = 0;

    @GuardedBy("lock")
    private long nextCompressedFrameLength = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannel$StreamPart.class */
    public enum StreamPart {
        MAGIC,
        FRAMES,
        FOOTER
    }

    private ReadableByteChunksFrameChannel(String str, long j, boolean z) {
        this.id = (String) Preconditions.checkNotNull(str, DruidMetrics.ID);
        this.bytesLimit = j;
        this.streamPart = z ? StreamPart.FRAMES : StreamPart.MAGIC;
        this.framesOnly = z;
    }

    public static ReadableByteChunksFrameChannel create(String str, boolean z) {
        return new ReadableByteChunksFrameChannel(str, 1L, z);
    }

    @Nullable
    public ListenableFuture<?> addChunk(byte[] bArr) {
        SettableFuture<?> settableFuture;
        synchronized (this.lock) {
            if (this.noMoreWrites) {
                throw new ISE("Channel is no longer accepting writes", new Object[0]);
            }
            try {
                if (bArr.length > 0) {
                    this.bytesAdded += bArr.length;
                    if (this.streamPart != StreamPart.FOOTER) {
                        this.chunks.add(Either.value(bArr));
                        this.bytesBuffered += bArr.length;
                    }
                    updateStreamState();
                    if (this.readyForReadingFuture != null && canReadFrame()) {
                        this.readyForReadingFuture.set((Object) null);
                        this.readyForReadingFuture = null;
                    }
                }
                if (this.addChunkBackpressureFuture == null && this.bytesBuffered >= this.bytesLimit && canReadFrame()) {
                    this.addChunkBackpressureFuture = SettableFuture.create();
                }
                settableFuture = this.addChunkBackpressureFuture;
            } catch (Throwable th) {
                setError(th);
                return null;
            }
        }
        return settableFuture;
    }

    public void setError(Throwable th) {
        synchronized (this.lock) {
            if (this.noMoreWrites) {
                log.noStackTrace().warn(th, "Channel is no longer accepting writes, cannot propagate exception", new Object[0]);
            } else {
                this.chunks.clear();
                this.chunks.add(Either.error(th));
                this.nextCompressedFrameLength = -1L;
                doneWriting();
            }
        }
    }

    public void doneWriting() {
        synchronized (this.lock) {
            this.noMoreWrites = true;
            if (this.readyForReadingFuture != null) {
                this.readyForReadingFuture.set((Object) null);
                this.readyForReadingFuture = null;
            }
        }
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public boolean isFinished() {
        boolean z;
        synchronized (this.lock) {
            z = this.chunks.isEmpty() && this.noMoreWrites && !canRead();
        }
        return z;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public boolean canRead() {
        synchronized (this.lock) {
            if (this.framesOnly) {
                return canReadError() || canReadFrame();
            }
            return canReadError() || canReadFrame() || (this.streamPart != StreamPart.FOOTER && this.noMoreWrites);
        }
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public Frame read() {
        Frame nextFrame;
        synchronized (this.lock) {
            if (canReadError()) {
                Throwable error = this.chunks.remove(0).map(bArr -> {
                    return null;
                }).error();
                Throwables.propagateIfPossible(error);
                throw new RuntimeException(error);
            }
            if (!canReadFrame()) {
                if (this.noMoreWrites) {
                    this.chunks.clear();
                    this.nextCompressedFrameLength = -1L;
                    throw new ISE("Incomplete or missing frame at end of stream (id = %s, position = %d)", this.id, Long.valueOf(this.bytesAdded - this.bytesBuffered));
                }
                if ($assertionsDisabled || !canRead()) {
                    throw new NoSuchElementException();
                }
                throw new AssertionError();
            }
            nextFrame = nextFrame();
        }
        return nextFrame;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public ListenableFuture<?> readabilityFuture() {
        synchronized (this.lock) {
            if (canRead() || isFinished()) {
                return Futures.immediateFuture((Object) null);
            }
            if (this.readyForReadingFuture != null) {
                return this.readyForReadingFuture;
            }
            SettableFuture<?> create = SettableFuture.create();
            this.readyForReadingFuture = create;
            return create;
        }
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            this.chunks.clear();
            this.nextCompressedFrameLength = -1L;
            this.noMoreWrites = true;
        }
    }

    public String getId() {
        return this.id;
    }

    public long getBytesAdded() {
        long j;
        synchronized (this.lock) {
            j = this.bytesAdded;
        }
        return j;
    }

    public boolean isErrorOrFinished() {
        boolean z;
        synchronized (this.lock) {
            z = isFinished() || (canRead() && !canReadFrame());
        }
        return z;
    }

    @VisibleForTesting
    long getBytesBuffered() {
        long j;
        synchronized (this.lock) {
            j = this.bytesBuffered;
        }
        return j;
    }

    private Frame nextFrame() {
        Memory region;
        synchronized (this.lock) {
            if (!canReadFrame()) {
                throw new ISE("Frame of size [%,d] not yet ready to read", Long.valueOf(this.nextCompressedFrameLength));
            }
            if (this.nextCompressedFrameLength > 2147483621) {
                throw new ISE("Cannot read frame of size [%,d] bytes", Long.valueOf(this.nextCompressedFrameLength));
            }
            int checkedCast = Ints.checkedCast(26 + this.nextCompressedFrameLength);
            region = copyFromQueuedChunks(checkedCast).region(1L, (26 + this.nextCompressedFrameLength) - 1);
            deleteFromQueuedChunks(checkedCast);
            updateStreamState();
        }
        Frame decompress = Frame.decompress(region, 0L, region.getCapacity());
        log.debug("Read frame with [%,d] rows and [%,d] bytes.", Integer.valueOf(decompress.numRows()), Long.valueOf(decompress.numBytes()));
        return decompress;
    }

    @GuardedBy("lock")
    private void updateStreamState() {
        if (this.streamPart == StreamPart.MAGIC && this.bytesBuffered >= FrameFileWriter.MAGIC.length) {
            if (!copyFromQueuedChunks(FrameFileWriter.MAGIC.length).equalTo(0L, Memory.wrap(FrameFileWriter.MAGIC), 0L, FrameFileWriter.MAGIC.length)) {
                throw new ISE("Invalid stream header (id = %s, position = %d)", this.id, Long.valueOf(this.bytesAdded - this.bytesBuffered));
            }
            this.streamPart = StreamPart.FRAMES;
            deleteFromQueuedChunks(FrameFileWriter.MAGIC.length);
        }
        if (this.streamPart == StreamPart.FRAMES && this.bytesBuffered >= 1) {
            Memory copyFromQueuedChunks = copyFromQueuedChunks(1);
            if (copyFromQueuedChunks.getByte(0L) == 1) {
                if (this.nextCompressedFrameLength == -1 && this.bytesBuffered >= 18) {
                    this.nextCompressedFrameLength = copyFromQueuedChunks(18).getLong(2L);
                    if (this.nextCompressedFrameLength <= 0 || this.nextCompressedFrameLength >= MAX_FRAME_SIZE_BYTES) {
                        throw new ISE("Invalid frame size (size = %,d B)", Long.valueOf(this.nextCompressedFrameLength));
                    }
                }
            } else {
                if (copyFromQueuedChunks.getByte(0L) != 2) {
                    throw new ISE("Invalid midstream marker (id = %s, position = %d)", this.id, Long.valueOf(this.bytesAdded - this.bytesBuffered));
                }
                this.streamPart = StreamPart.FOOTER;
                this.nextCompressedFrameLength = -1L;
            }
        }
        if (this.streamPart == StreamPart.FOOTER) {
            if (this.bytesBuffered > 0) {
                deleteFromQueuedChunks(this.bytesBuffered);
            }
            if (!$assertionsDisabled && (this.bytesBuffered != 0 || !this.chunks.isEmpty() || this.nextCompressedFrameLength != -1)) {
                throw new AssertionError();
            }
        }
        if (this.addChunkBackpressureFuture != null) {
            if (this.bytesBuffered < this.bytesLimit || !canReadFrame()) {
                this.addChunkBackpressureFuture.set((Object) null);
                this.addChunkBackpressureFuture = null;
            }
        }
    }

    @GuardedBy("lock")
    private boolean canReadError() {
        return this.chunks.size() > 0 && this.chunks.get(0).isError();
    }

    @GuardedBy("lock")
    private boolean canReadFrame() {
        return this.nextCompressedFrameLength != -1 && this.bytesBuffered >= 26 + this.nextCompressedFrameLength;
    }

    @GuardedBy("lock")
    private Memory copyFromQueuedChunks(int i) {
        if (this.bytesBuffered < i) {
            throw new IAE("Cannot copy [%,d] bytes, only have [%,d] buffered", Integer.valueOf(i), Long.valueOf(this.bytesBuffered));
        }
        WritableMemory allocate = WritableMemory.allocate(i, ByteOrder.LITTLE_ENDIAN);
        int i2 = 0;
        int i3 = 0;
        while (i3 < this.chunks.size()) {
            byte[] valueOrThrow = this.chunks.get(i3).valueOrThrow();
            int i4 = i3 == 0 ? this.positionInFirstChunk : 0;
            int min = Math.min(valueOrThrow.length - i4, i - i2);
            allocate.putByteArray(i2, valueOrThrow, i4, min);
            i2 += min;
            if (i2 == i) {
                break;
            }
            i3++;
        }
        return allocate;
    }

    @GuardedBy("lock")
    private void deleteFromQueuedChunks(long j) {
        if (this.bytesBuffered < j) {
            throw new IAE("Cannot delete [%,d] bytes, only have [%,d] buffered", Long.valueOf(j), Long.valueOf(this.bytesBuffered));
        }
        long j2 = j;
        while (j2 > 0) {
            int length = this.chunks.get(0).valueOrThrow().length - this.positionInFirstChunk;
            if (j2 >= length) {
                j2 -= length;
                this.positionInFirstChunk = 0;
                this.chunks.remove(0);
            } else {
                this.positionInFirstChunk = Ints.checkedCast(this.positionInFirstChunk + j2);
                j2 = 0;
            }
        }
        this.bytesBuffered -= j;
        this.nextCompressedFrameLength = -1L;
    }

    static {
        $assertionsDisabled = !ReadableByteChunksFrameChannel.class.desiredAssertionStatus();
        log = new Logger(ReadableByteChunksFrameChannel.class);
    }
}
