package oadd.org.apache.drill.exec.work.batch;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import oadd.com.google.common.base.Joiner;
import oadd.com.google.common.base.Preconditions;
import oadd.com.google.common.base.Stopwatch;
import oadd.com.google.common.collect.Queues;
import oadd.io.netty.buffer.DrillBuf;
import oadd.org.apache.drill.exec.ExecConstants;
import oadd.org.apache.drill.exec.memory.BufferAllocator;
import oadd.org.apache.drill.exec.memory.OutOfMemoryException;
import oadd.org.apache.drill.exec.ops.FragmentContext;
import oadd.org.apache.drill.exec.proto.BitData;
import oadd.org.apache.drill.exec.proto.ExecProtos;
import oadd.org.apache.drill.exec.proto.helper.QueryIdHelper;
import oadd.org.apache.drill.exec.record.RawFragmentBatch;
import oadd.org.apache.drill.exec.store.LocalSyncableFileSystem;
import oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oadd/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.class */
public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatchWrapper> {
    static final Logger logger;
    private static String DRILL_LOCAL_IMPL_STRING;
    private static final float STOP_SPOOLING_FRACTION = 0.5f;
    public static final long ALLOCATOR_INITIAL_RESERVATION = 1048576;
    public static final long ALLOCATOR_MAX_RESERVATION = 20000000000L;
    private final BufferAllocator allocator;
    private final long threshold;
    private final int oppositeId;
    private final int bufferIndex;
    private volatile SpoolingState spoolingState;
    private volatile long currentSizeInMemory;
    private volatile Spooler spooler;
    private FileSystem fs;
    private Path path;
    private FSDataOutputStream outputStream;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:oadd/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer$RawFragmentBatchWrapper.class */
    public class RawFragmentBatchWrapper {
        private RawFragmentBatch batch;
        private volatile boolean available;
        private CountDownLatch latch;
        private volatile int bodyLength;
        private volatile boolean outOfMemory = false;
        private long start = -1;
        private long check;
        static final /* synthetic */ boolean $assertionsDisabled;

        public RawFragmentBatchWrapper(RawFragmentBatch rawFragmentBatch, boolean z) {
            Preconditions.checkNotNull(rawFragmentBatch);
            this.batch = rawFragmentBatch;
            this.available = z;
            this.latch = new CountDownLatch(z ? 0 : 1);
            if (z) {
                rawFragmentBatch.sendOk();
            }
        }

        public boolean isNull() {
            return this.batch == null;
        }

        public RawFragmentBatch get() throws InterruptedException, IOException {
            if (this.available) {
                if ($assertionsDisabled || this.batch.getHeader() != null) {
                    return this.batch;
                }
                throw new AssertionError("batch header null");
            }
            this.latch.await();
            readFromStream();
            this.available = true;
            return this.batch;
        }

        public long getBodySize() {
            if (this.batch.getBody() == null) {
                return 0L;
            }
            if ($assertionsDisabled || this.batch.getBody().readableBytes() >= 0) {
                return this.batch.getBody().readableBytes();
            }
            throw new AssertionError();
        }

        public void writeToStream(FSDataOutputStream fSDataOutputStream) throws IOException {
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.start();
            this.available = false;
            this.check = ThreadLocalRandom.current().nextLong();
            this.start = fSDataOutputStream.getPos();
            SpoolingRawBatchBuffer.logger.debug("Writing check value {} at position {}", Long.valueOf(this.check), Long.valueOf(this.start));
            fSDataOutputStream.writeLong(this.check);
            this.batch.getHeader().writeDelimitedTo(fSDataOutputStream);
            DrillBuf body = this.batch.getBody();
            if (body != null) {
                this.bodyLength = body.capacity();
            } else {
                this.bodyLength = 0;
            }
            if (this.bodyLength > 0) {
                body.getBytes(0, (OutputStream) fSDataOutputStream, this.bodyLength);
            }
            fSDataOutputStream.hsync();
            SpoolingRawBatchBuffer.logger.debug("After spooling batch, stream at position {}. File length {}", Long.valueOf(fSDataOutputStream.getPos()), Long.valueOf(SpoolingRawBatchBuffer.this.fs.getFileStatus(SpoolingRawBatchBuffer.this.path).getLen()));
            this.batch.sendOk();
            this.latch.countDown();
            long elapsed = stopwatch.elapsed(TimeUnit.MICROSECONDS);
            SpoolingRawBatchBuffer.logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", Long.valueOf(elapsed), Integer.valueOf(this.bodyLength), Long.valueOf(this.bodyLength / elapsed));
            if (body != null) {
                body.release();
            }
        }

        /* JADX WARN: Finally extract failed */
        public void readFromStream() throws IOException, InterruptedException {
            InputStream open;
            Throwable th;
            DrillBuf buffer;
            Throwable th2;
            long pos;
            long readLong;
            long j = this.start;
            boolean z = true;
            int i = 0;
            while (z) {
                Thread.sleep(i);
                try {
                    try {
                        open = SpoolingRawBatchBuffer.this.fs.open(SpoolingRawBatchBuffer.this.path);
                        th = null;
                        try {
                            buffer = SpoolingRawBatchBuffer.this.allocator.buffer(this.bodyLength);
                            th2 = null;
                            try {
                                open.seek(this.start);
                                pos = open.getPos();
                                readLong = open.readLong();
                                open.getPos();
                            } catch (Throwable th3) {
                                if (buffer != null) {
                                    if (0 != 0) {
                                        try {
                                            buffer.close();
                                        } catch (Throwable th4) {
                                            th2.addSuppressed(th4);
                                        }
                                    } else {
                                        buffer.close();
                                    }
                                }
                                throw th3;
                            }
                        } catch (Throwable th5) {
                            if (open != null) {
                                if (0 != 0) {
                                    try {
                                        open.close();
                                    } catch (Throwable th6) {
                                        th.addSuppressed(th6);
                                    }
                                } else {
                                    open.close();
                                }
                            }
                            throw th5;
                        }
                    } catch (Throwable th7) {
                        if (z && this.batch != null) {
                            this.batch.getBody().release();
                        }
                        throw th7;
                    }
                } catch (EOFException e) {
                    SpoolingRawBatchBuffer.logger.warn("EOF reading from file {} at pos {}. Current file size: {}", SpoolingRawBatchBuffer.this.path, Long.valueOf(j), Long.valueOf(SpoolingRawBatchBuffer.this.fs.getFileStatus(SpoolingRawBatchBuffer.this.path).getLen()));
                    i = Math.max(1, i * 2);
                    if (i >= 60000) {
                        throw e;
                    }
                    if (z && this.batch != null) {
                        this.batch.getBody().release();
                    }
                }
                if (!$assertionsDisabled && readLong != this.check) {
                    throw new AssertionError(String.format("Check values don't match: %d %d, Position %d", Long.valueOf(this.check), Long.valueOf(readLong), Long.valueOf(pos)));
                }
                Stopwatch stopwatch = new Stopwatch();
                stopwatch.start();
                BitData.FragmentRecordBatch parseDelimitedFrom = BitData.FragmentRecordBatch.parseDelimitedFrom(open);
                open.getPos();
                if (!$assertionsDisabled && parseDelimitedFrom == null) {
                    throw new AssertionError("header null after parsing from stream");
                }
                buffer.writeBytes(open, this.bodyLength);
                j = open.getPos();
                this.batch = new RawFragmentBatch(parseDelimitedFrom, buffer, null);
                this.available = true;
                this.latch.countDown();
                long elapsed = stopwatch.elapsed(TimeUnit.MICROSECONDS);
                SpoolingRawBatchBuffer.logger.debug("Took {} us to read {} from disk. Rate {} mb/s", Long.valueOf(elapsed), Integer.valueOf(this.bodyLength), Long.valueOf(this.bodyLength / elapsed));
                z = false;
                if (buffer != null) {
                    if (0 != 0) {
                        try {
                            buffer.close();
                        } catch (Throwable th8) {
                            th2.addSuppressed(th8);
                        }
                    } else {
                        buffer.close();
                    }
                }
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        open.close();
                    }
                }
                if (0 != 0 && this.batch != null) {
                    this.batch.getBody().release();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isOutOfMemory() {
            return this.outOfMemory;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setOutOfMemory(boolean z) {
            this.outOfMemory = z;
        }

        static {
            $assertionsDisabled = !SpoolingRawBatchBuffer.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:oadd/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer$Spooler.class */
    public class Spooler extends Thread {
        private final LinkedBlockingDeque<RawFragmentBatchWrapper> spoolingQueue;
        private volatile boolean shouldContinue = true;
        private Thread spoolingThread;

        public Spooler(String str) {
            setDaemon(true);
            setName(str);
            this.spoolingQueue = Queues.newLinkedBlockingDeque();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.shouldContinue) {
                try {
                    try {
                        try {
                            try {
                                this.spoolingQueue.take().writeToStream(SpoolingRawBatchBuffer.this.outputStream);
                            } catch (IOException e) {
                                SpoolingRawBatchBuffer.this.context.fail(e);
                            }
                        } catch (InterruptedException e2) {
                            if (!this.shouldContinue) {
                                break;
                            }
                        }
                    } catch (Throwable th) {
                        SpoolingRawBatchBuffer.this.context.fail(th);
                        SpoolingRawBatchBuffer.logger.info("Spooler thread exiting");
                        return;
                    }
                } catch (Throwable th2) {
                    SpoolingRawBatchBuffer.logger.info("Spooler thread exiting");
                    throw th2;
                }
            }
            SpoolingRawBatchBuffer.logger.info("Spooler thread exiting");
        }

        public void addBatchForSpooling(RawFragmentBatchWrapper rawFragmentBatchWrapper) {
            if (SpoolingRawBatchBuffer.this.isSpoolingStopped()) {
                this.spoolingQueue.add(rawFragmentBatchWrapper);
                return;
            }
            rawFragmentBatchWrapper.available = true;
            rawFragmentBatchWrapper.batch.sendOk();
            rawFragmentBatchWrapper.latch.countDown();
        }

        public void terminate() {
            SpoolingRawBatchBuffer.this.stopSpooling();
            this.shouldContinue = false;
            if (this.spoolingThread.isAlive()) {
                this.spoolingThread.interrupt();
            }
        }
    }

    /* loaded from: input_file:oadd/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer$SpoolingBufferQueue.class */
    private class SpoolingBufferQueue implements BaseRawBatchBuffer.BufferQueue<RawFragmentBatchWrapper> {
        private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer;

        private SpoolingBufferQueue() {
            this.buffer = Queues.newLinkedBlockingDeque();
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public void addOomBatch(RawFragmentBatch rawFragmentBatch) {
            RawFragmentBatchWrapper rawFragmentBatchWrapper = new RawFragmentBatchWrapper(rawFragmentBatch, true);
            rawFragmentBatchWrapper.setOutOfMemory(true);
            this.buffer.addFirst(rawFragmentBatchWrapper);
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public RawFragmentBatch poll() throws IOException {
            RawFragmentBatchWrapper poll = this.buffer.poll();
            if (poll == null) {
                return null;
            }
            try {
                return poll.get();
            } catch (InterruptedException e) {
                return null;
            }
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public RawFragmentBatch take() throws IOException, InterruptedException {
            return this.buffer.take().get();
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public boolean checkForOutOfMemory() {
            return this.buffer.peek().isOutOfMemory();
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public int size() {
            return this.buffer.size();
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public boolean isEmpty() {
            return this.buffer.size() == 0;
        }

        @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer.BufferQueue
        public void add(RawFragmentBatchWrapper rawFragmentBatchWrapper) {
            this.buffer.add(rawFragmentBatchWrapper);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:oadd/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer$SpoolingState.class */
    public enum SpoolingState {
        NOT_SPOOLING,
        SPOOLING,
        PAUSE_SPOOLING,
        STOP_SPOOLING
    }

    public SpoolingRawBatchBuffer(FragmentContext fragmentContext, int i, int i2, int i3) throws IOException, OutOfMemoryException {
        super(fragmentContext, i);
        this.currentSizeInMemory = 0L;
        this.allocator = fragmentContext.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true);
        this.threshold = fragmentContext.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
        this.oppositeId = i2;
        this.bufferIndex = i3;
        this.bufferQueue = new SpoolingBufferQueue();
    }

    private synchronized void setSpoolingState(SpoolingState spoolingState) {
        SpoolingState spoolingState2 = this.spoolingState;
        if (spoolingState == SpoolingState.NOT_SPOOLING || spoolingState2 == SpoolingState.STOP_SPOOLING) {
            return;
        }
        this.spoolingState = spoolingState;
    }

    private boolean isCurrentlySpooling() {
        return this.spoolingState == SpoolingState.SPOOLING;
    }

    private void startSpooling() {
        setSpoolingState(SpoolingState.SPOOLING);
    }

    private void pauseSpooling() {
        setSpoolingState(SpoolingState.PAUSE_SPOOLING);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isSpoolingStopped() {
        return this.spoolingState == SpoolingState.STOP_SPOOLING;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopSpooling() {
        setSpoolingState(SpoolingState.STOP_SPOOLING);
    }

    public String getDir() {
        List stringList = this.context.getConfig().getStringList(ExecConstants.TEMP_DIRECTORIES);
        return (String) stringList.get(ThreadLocalRandom.current().nextInt(stringList.size()));
    }

    private synchronized void initSpooler() throws IOException {
        if (this.spooler != null) {
            return;
        }
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", this.context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
        configuration.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
        this.fs = FileSystem.get(configuration);
        this.path = getPath();
        this.outputStream = this.fs.create(this.path);
        this.spooler = new Spooler(QueryIdHelper.getExecutorThreadName(this.context.getHandle()).concat(":Spooler-" + this.oppositeId + "-" + this.bufferIndex));
        this.spooler.start();
    }

    @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer
    protected void enqueueInner(RawFragmentBatch rawFragmentBatch) throws IOException {
        if (!$assertionsDisabled && rawFragmentBatch.getHeader().getSendingMajorFragmentId() != this.oppositeId) {
            throw new AssertionError();
        }
        logger.debug("Enqueue batch. Current buffer size: {}. Last batch: {}. Sending fragment: {}", Integer.valueOf(this.bufferQueue.size()), Boolean.valueOf(rawFragmentBatch.getHeader().getIsLastBatch()), Integer.valueOf(rawFragmentBatch.getHeader().getSendingMajorFragmentId()));
        boolean isCurrentlySpooling = isCurrentlySpooling();
        RawFragmentBatchWrapper rawFragmentBatchWrapper = new RawFragmentBatchWrapper(rawFragmentBatch, !isCurrentlySpooling);
        this.currentSizeInMemory += rawFragmentBatchWrapper.getBodySize();
        if (isCurrentlySpooling) {
            if (this.spooler == null) {
                initSpooler();
            }
            this.spooler.addBatchForSpooling(rawFragmentBatchWrapper);
        }
        this.bufferQueue.add(rawFragmentBatchWrapper);
        if (isCurrentlySpooling || this.currentSizeInMemory <= this.threshold) {
            return;
        }
        logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", Long.valueOf(this.currentSizeInMemory), Long.valueOf(this.threshold));
        startSpooling();
    }

    @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer, oadd.org.apache.drill.exec.record.RawFragmentBatchProvider
    public void kill(FragmentContext fragmentContext) {
        this.allocator.close();
        if (this.spooler != null) {
            this.spooler.terminate();
        }
    }

    @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer
    protected void upkeep(RawFragmentBatch rawFragmentBatch) {
        if (rawFragmentBatch.getHeader().getIsOutOfMemory()) {
            this.outOfMemory.set(true);
            return;
        }
        if (rawFragmentBatch.getBody() != null) {
            this.currentSizeInMemory -= r0.capacity();
        }
        if (isCurrentlySpooling() && ((float) this.currentSizeInMemory) < ((float) this.threshold) * 0.5f) {
            logger.debug("buffer size {} less than {}x threshold. Stop spooling.", Long.valueOf(this.currentSizeInMemory), Float.valueOf(0.5f));
            pauseSpooling();
        }
        logger.debug("Got batch. Current buffer size: {}", Integer.valueOf(this.bufferQueue.size()));
    }

    @Override // oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer, java.lang.AutoCloseable
    public void close() {
        if (this.spooler != null) {
            this.spooler.terminate();
            while (this.spooler.isAlive()) {
                try {
                    this.spooler.join();
                } catch (InterruptedException e) {
                    logger.warn("Interrupted while waiting for spooling thread to exit");
                }
            }
        }
        this.allocator.close();
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
            }
        } catch (IOException e2) {
            logger.warn("Failed to cleanup I/O streams", (Throwable) e2);
        }
        if (this.context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
            try {
                if (this.fs != null) {
                    this.fs.delete(this.path, false);
                    logger.debug("Deleted file {}", this.path.toString());
                }
            } catch (IOException e3) {
                logger.warn("Failed to delete temporary files", (Throwable) e3);
            }
        }
        super.close();
    }

    private Path getPath() {
        ExecProtos.FragmentHandle handle = this.context.getHandle();
        return new Path(Joiner.on("/").join(getDir(), QueryIdHelper.getQueryId(handle.getQueryId()), Integer.valueOf(handle.getMajorFragmentId()), Integer.valueOf(handle.getMinorFragmentId()), Integer.valueOf(this.oppositeId), Integer.valueOf(this.bufferIndex)));
    }

    static {
        $assertionsDisabled = !SpoolingRawBatchBuffer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
        DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
    }
}
