/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.io.netty.buffer.ByteBuf;
import net.nmoncho.shaded.io.netty.channel.Channel;
import net.nmoncho.shaded.io.netty.channel.EventLoop;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.transport.CQLMessageHandler;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class Flusher
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Flusher.class);
    @VisibleForTesting
    public static final int MAX_FRAMED_PAYLOAD_SIZE = Math.min(131072, 131072 - Math.max(10, 12));
    protected final EventLoop eventLoop;
    private final ConcurrentLinkedQueue<FlushItem<?>> queued = new ConcurrentLinkedQueue();
    protected final AtomicBoolean scheduled = new AtomicBoolean(false);
    protected final List<FlushItem<?>> processed = new ArrayList();
    private final HashSet<Channel> channels = new HashSet();
    private final Map<Channel, FlushBuffer> payloads = new HashMap<Channel, FlushBuffer>();

    static Flusher legacy(EventLoop loop) {
        return new LegacyFlusher(loop);
    }

    static Flusher immediate(EventLoop loop) {
        return new ImmediateFlusher(loop);
    }

    void start() {
        if (!this.scheduled.get() && this.scheduled.compareAndSet(false, true)) {
            this.eventLoop.execute((Runnable)this);
        }
    }

    private Flusher(EventLoop eventLoop) {
        this.eventLoop = eventLoop;
    }

    void enqueue(FlushItem<?> item) {
        this.queued.add(item);
    }

    FlushItem<?> poll() {
        return this.queued.poll();
    }

    boolean isEmpty() {
        return this.queued.isEmpty();
    }

    private void processUnframedResponse(FlushItem.Unframed flush) {
        flush.channel.write(flush.response, flush.channel.voidPromise());
        this.channels.add(flush.channel);
    }

    private void processFramedResponse(FlushItem.Framed flush) {
        Envelope outbound = (Envelope)flush.response;
        if (CQLMessageHandler.envelopeSize(outbound.header) >= MAX_FRAMED_PAYLOAD_SIZE) {
            this.flushLargeMessage(flush.channel, outbound, flush.allocator);
        } else {
            this.payloads.computeIfAbsent(flush.channel, channel -> new FlushBuffer((Channel)channel, flush.allocator, 5)).add((Envelope)flush.response);
        }
    }

    private void flushLargeMessage(Channel channel, Envelope outbound, FrameEncoder.PayloadAllocator allocator) {
        ByteBuf body = outbound.body;
        boolean firstFrame = true;
        while (body.readableBytes() > 0 || firstFrame) {
            int remaining;
            int payloadSize = Math.min(body.readableBytes(), MAX_FRAMED_PAYLOAD_SIZE);
            FrameEncoder.Payload payload = allocator.allocate(false, payloadSize);
            if (logger.isTraceEnabled()) {
                logger.trace("Allocated initial buffer of {} for 1 large item", (Object)FBUtilities.prettyPrintMemory(payload.buffer.capacity()));
            }
            ByteBuffer buf = payload.buffer;
            if (payloadSize >= MAX_FRAMED_PAYLOAD_SIZE) {
                buf.limit(MAX_FRAMED_PAYLOAD_SIZE);
            }
            if (firstFrame) {
                outbound.encodeHeaderInto(buf);
                firstFrame = false;
            }
            if ((remaining = Math.min(buf.remaining(), body.readableBytes())) > 0) {
                buf.put(body.slice(body.readerIndex(), remaining).nioBuffer());
            }
            body.readerIndex(body.readerIndex() + remaining);
            this.writeAndFlush(channel, payload);
        }
    }

    private void writeAndFlush(Channel channel, FrameEncoder.Payload payload) {
        payload.finish();
        channel.writeAndFlush((Object)payload, channel.voidPromise());
    }

    protected boolean processQueue() {
        FlushItem<?> flush;
        boolean doneWork = false;
        while ((flush = this.poll()) != null) {
            if (flush.kind == FlushItem.Kind.FRAMED) {
                this.processFramedResponse((FlushItem.Framed)flush);
            } else {
                this.processUnframedResponse((FlushItem.Unframed)flush);
            }
            this.processed.add(flush);
            doneWork = true;
        }
        return doneWork;
    }

    protected void flushWrittenChannels() {
        for (Channel channel : this.channels) {
            channel.flush();
        }
        for (FlushBuffer flushBuffer : this.payloads.values()) {
            flushBuffer.finish();
        }
        for (FlushItem flushItem : this.processed) {
            flushItem.release();
        }
        this.payloads.clear();
        this.channels.clear();
        this.processed.clear();
    }

    private static final class ImmediateFlusher
    extends Flusher {
        private ImmediateFlusher(EventLoop eventLoop) {
            super(eventLoop);
        }

        @Override
        public void run() {
            this.scheduled.set(false);
            try {
                this.processQueue();
            }
            finally {
                this.flushWrittenChannels();
            }
        }
    }

    private static final class LegacyFlusher
    extends Flusher {
        int runsSinceFlush = 0;
        int runsWithNoWork = 0;

        private LegacyFlusher(EventLoop eventLoop) {
            super(eventLoop);
        }

        @Override
        public void run() {
            boolean doneWork = this.processQueue();
            ++this.runsSinceFlush;
            if (!doneWork || this.runsSinceFlush > 2 || this.processed.size() > 50) {
                this.flushWrittenChannels();
                this.runsSinceFlush = 0;
            }
            if (doneWork) {
                this.runsWithNoWork = 0;
            } else if (++this.runsWithNoWork > 5) {
                this.scheduled.set(false);
                if (this.isEmpty() || !this.scheduled.compareAndSet(false, true)) {
                    return;
                }
            }
            this.eventLoop.schedule((Runnable)this, 10000L, TimeUnit.NANOSECONDS);
        }
    }

    private class FlushBuffer
    extends ArrayList<Envelope> {
        private final Channel channel;
        private final FrameEncoder.PayloadAllocator allocator;
        private int sizeInBytes;

        FlushBuffer(Channel channel, FrameEncoder.PayloadAllocator allocator, int initialCapacity) {
            super(initialCapacity);
            this.sizeInBytes = 0;
            this.channel = channel;
            this.allocator = allocator;
        }

        @Override
        public boolean add(Envelope toFlush) {
            this.sizeInBytes += CQLMessageHandler.envelopeSize(toFlush.header);
            return super.add(toFlush);
        }

        private FrameEncoder.Payload allocate(int requiredBytes, int maxItems) {
            int bufferSize = Math.min(requiredBytes, MAX_FRAMED_PAYLOAD_SIZE);
            FrameEncoder.Payload payload = this.allocator.allocate(true, bufferSize);
            if (payload.remaining() >= MAX_FRAMED_PAYLOAD_SIZE) {
                payload.buffer.limit(payload.buffer.position() + bufferSize);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Allocated initial buffer of {} for up to {} items", (Object)FBUtilities.prettyPrintMemory(payload.buffer.capacity()), (Object)maxItems);
            }
            return payload;
        }

        public void finish() {
            int writtenBytes = 0;
            int messagesToWrite = this.size();
            FrameEncoder.Payload sending = this.allocate(this.sizeInBytes, messagesToWrite);
            for (Envelope f : this) {
                int messageSize = CQLMessageHandler.envelopeSize(f.header);
                if (sending.remaining() < messageSize) {
                    Flusher.this.writeAndFlush(this.channel, sending);
                    sending = this.allocate(this.sizeInBytes - writtenBytes, messagesToWrite);
                }
                f.encodeInto(sending.buffer);
                writtenBytes += messageSize;
                --messagesToWrite;
            }
            Flusher.this.writeAndFlush(this.channel, sending);
        }
    }

    static class FlushItem<T> {
        final Kind kind;
        final Channel channel;
        final T response;
        final Envelope request;
        final Consumer<FlushItem<T>> tidy;

        FlushItem(Kind kind, Channel channel, T response, Envelope request, Consumer<FlushItem<T>> tidy) {
            this.kind = kind;
            this.channel = channel;
            this.request = request;
            this.response = response;
            this.tidy = tidy;
        }

        void release() {
            this.tidy.accept(this);
        }

        static class Unframed
        extends FlushItem<Message.Response> {
            Unframed(Channel channel, Message.Response response, Envelope request, Consumer<FlushItem<Message.Response>> tidy) {
                super(Kind.UNFRAMED, channel, response, request, tidy);
            }
        }

        static class Framed
        extends FlushItem<Envelope> {
            final FrameEncoder.PayloadAllocator allocator;

            Framed(Channel channel, Envelope response, Envelope request, FrameEncoder.PayloadAllocator allocator, Consumer<FlushItem<Envelope>> tidy) {
                super(Kind.FRAMED, channel, response, request, tidy);
                this.allocator = allocator;
            }
        }

        static enum Kind {
            FRAMED,
            UNFRAMED;

        }
    }
}

