/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.beats.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.FrameType;
import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolException;
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;

public class BatchDecoder
extends ByteToMessageDecoder {
    private static final int INITIAL_WINDOW_SIZE = 1;
    private static final int INITIAL_QUEUE_SIZE = 1;
    private static final int CODE_READABLE_BYTES = 1;
    private static final int INT_READABLE_BYTES = 4;
    private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER = new ProtocolVersionDecoder();
    private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER = new FrameTypeDecoder();
    private final ComponentLog log;
    private final AtomicReference<ProtocolVersion> versionRef = new AtomicReference();
    private final AtomicReference<FrameType> frameTypeRef = new AtomicReference();
    private final AtomicInteger windowSize = new AtomicInteger(1);
    private final AtomicReference<Integer> sequenceNumberRef = new AtomicReference();
    private final AtomicReference<Integer> payloadSizeRef = new AtomicReference();
    private final AtomicReference<Integer> compressedSizeRef = new AtomicReference();
    private Queue<BatchMessage> batchMessages = new ArrayBlockingQueue<BatchMessage>(1);

    public BatchDecoder(ComponentLog log) {
        this.log = Objects.requireNonNull(log, "Component Log required");
    }

    protected void decode(ChannelHandlerContext context, ByteBuf buffer, List<Object> objects) {
        ProtocolVersion protocolVersion = this.readVersion(buffer);
        if (ProtocolVersion.VERSION_2 == protocolVersion) {
            FrameType frameType = this.readFrameType(buffer);
            this.decodeFrameType(frameType, context, buffer, objects);
        } else if (ProtocolVersion.VERSION_1 == protocolVersion) {
            throw new ProtocolException("Protocol Version [1] not supported");
        }
    }

    private void decodeFrameType(FrameType frameType, ChannelHandlerContext context, ByteBuf buffer, List<Object> batches) {
        if (frameType == null) {
            this.log.trace("Frame Type not found");
        } else if (FrameType.COMPRESSED == frameType) {
            this.processCompressed(context, buffer, batches);
        } else if (FrameType.WINDOW_SIZE == frameType) {
            this.processWindowSize(context, buffer);
        } else if (FrameType.JSON == frameType) {
            this.processJson(context, buffer, batches);
        } else {
            String message = String.format("Frame Type [%s] not supported", frameType);
            throw new ProtocolException(message);
        }
    }

    private void processWindowSize(ChannelHandlerContext context, ByteBuf buffer) {
        Integer readWindowSize = this.readUnsignedInteger(buffer);
        if (readWindowSize == null) {
            this.log.trace("State [Read Window Size] not enough readable bytes");
        } else {
            this.windowSize.getAndSet(readWindowSize);
            this.batchMessages = new ArrayBlockingQueue<BatchMessage>(readWindowSize);
            this.resetFrameTypeVersion();
            Channel channel = context.channel();
            this.log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", new Object[]{readWindowSize, channel.localAddress(), channel.remoteAddress()});
        }
    }

    private void processCompressed(ChannelHandlerContext context, ByteBuf buffer, List<Object> batches) {
        Integer readCompressedSize = this.readCompressedSize(buffer);
        if (readCompressedSize == null) {
            this.log.trace("State [Read Compressed] not enough readable bytes");
        } else {
            int readableBytes = buffer.readableBytes();
            if (readableBytes >= readCompressedSize) {
                Channel channel = context.channel();
                this.log.debug("Processing Compressed Size [{}] Local [{}] Remote [{}]", new Object[]{readCompressedSize, channel.localAddress(), channel.remoteAddress()});
                this.processCompressed(context, buffer, readCompressedSize, batches);
            } else {
                this.log.trace("State [Read Compressed] not enough readable bytes [{}] for compressed [{}]", new Object[]{readableBytes, readCompressedSize});
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processCompressed(ChannelHandlerContext context, ByteBuf buffer, int compressedSize, List<Object> batches) {
        ByteBuf inflated = context.alloc().buffer(compressedSize);
        try {
            this.readCompressedBuffer(buffer, inflated, compressedSize);
            this.resetSequenceVersionPayloadSize();
            this.resetFrameTypeVersion();
            while (inflated.isReadable()) {
                this.decode(context, inflated, batches);
            }
        }
        finally {
            this.compressedSizeRef.set(null);
            inflated.release();
        }
    }

    private void processJson(ChannelHandlerContext context, ByteBuf buffer, List<Object> batches) {
        Channel channel = context.channel();
        Integer sequenceNumber = this.readSequenceNumber(buffer);
        if (sequenceNumber == null) {
            this.log.trace("State [Read JSON] Sequence Number not found Remote [{}]", new Object[]{channel.remoteAddress()});
        } else {
            Integer payloadSize = this.readPayloadSize(buffer);
            if (payloadSize == null) {
                this.log.trace("State [Read JSON] Payload Size not found Remote [{}]", new Object[]{channel.remoteAddress()});
            } else {
                this.processJson(sequenceNumber, payloadSize, context, buffer, batches);
            }
        }
    }

    private void processJson(int sequenceNumber, int payloadSize, ChannelHandlerContext context, ByteBuf buffer, List<Object> batches) {
        Channel channel = context.channel();
        BatchMessage batchMessage = this.readJsonMessage(context, sequenceNumber, payloadSize, buffer);
        if (batchMessage == null) {
            this.log.trace("State [Read JSON] Message not found Remote [{}]", new Object[]{channel.remoteAddress()});
        } else {
            this.processBatchMessage(batchMessage, batches);
            this.log.debug("Processed JSON Message Sequence Number [{}] Payload Size [{}] Local [{}] Remote [{}]", new Object[]{sequenceNumber, payloadSize, channel.localAddress(), channel.remoteAddress()});
        }
    }

    private BatchMessage readJsonMessage(ChannelHandlerContext context, int sequenceNumber, int payloadSize, ByteBuf buffer) {
        BatchMessage batchMessage;
        int readableBytes = buffer.readableBytes();
        if (readableBytes >= payloadSize) {
            byte[] payload = new byte[payloadSize];
            buffer.readBytes(payload);
            Channel channel = context.channel();
            String sender = this.getRemoteHostAddress(channel);
            batchMessage = new BatchMessage(sender, payload, sequenceNumber);
        } else {
            batchMessage = null;
            this.log.trace("State [Read JSON] Sequence Number [{}] not enough readable bytes [{}] for payload [{}]", new Object[]{sequenceNumber, readableBytes, payloadSize});
        }
        return batchMessage;
    }

    private String getRemoteHostAddress(Channel channel) {
        String remoteHostAddress;
        SocketAddress remoteAddress = channel.remoteAddress();
        if (remoteAddress instanceof InetSocketAddress) {
            InetSocketAddress remoteSocketAddress = (InetSocketAddress)remoteAddress;
            InetAddress address = remoteSocketAddress.getAddress();
            remoteHostAddress = address.getHostAddress();
        } else {
            remoteHostAddress = remoteAddress.toString();
        }
        return remoteHostAddress;
    }

    private void processBatchMessage(BatchMessage batchMessage, List<Object> batches) {
        if (this.batchMessages.offer(batchMessage)) {
            this.resetSequenceVersionPayloadSize();
            this.resetFrameTypeVersion();
            if (this.windowSize.get() == this.batchMessages.size()) {
                ArrayList<BatchMessage> messages = new ArrayList<BatchMessage>(this.batchMessages);
                Batch batch = new Batch(messages);
                batches.add(batch);
                this.resetWindowSize();
            }
        } else {
            String message = String.format("Received message exceeds Window Size [%d]", this.windowSize.get());
            throw new ProtocolException(message);
        }
    }

    private void readCompressedBuffer(ByteBuf compressedBuffer, ByteBuf inflated, int compressedSize) {
        Inflater inflater = new Inflater();
        try (ByteBufOutputStream outputStream = new ByteBufOutputStream(inflated);
             InflaterOutputStream inflaterOutputStream = new InflaterOutputStream((OutputStream)outputStream, inflater);){
            compressedBuffer.readBytes((OutputStream)inflaterOutputStream, compressedSize);
        }
        catch (IOException e) {
            String message = String.format("Read Compressed Payload Size [%d] failed", compressedSize);
            throw new ProtocolException(message, e);
        }
        finally {
            inflater.end();
        }
    }

    private Integer readSequenceNumber(ByteBuf buffer) {
        if (this.sequenceNumberRef.get() == null) {
            Integer readSequenceNumber = this.readUnsignedInteger(buffer);
            if (readSequenceNumber == null) {
                this.log.trace("State [Read JSON] not enough readable bytes for Sequence Number");
            } else {
                this.sequenceNumberRef.set(readSequenceNumber);
            }
        }
        return this.sequenceNumberRef.get();
    }

    private Integer readPayloadSize(ByteBuf buffer) {
        if (this.payloadSizeRef.get() == null) {
            Integer readPayloadSize = this.readUnsignedInteger(buffer);
            if (readPayloadSize == null) {
                this.log.trace("State [Read JSON] not enough readable bytes for Payload Size");
            } else {
                this.payloadSizeRef.set(readPayloadSize);
            }
        }
        return this.payloadSizeRef.get();
    }

    private Integer readCompressedSize(ByteBuf buffer) {
        if (this.compressedSizeRef.get() == null) {
            Integer readCompressedSize = this.readUnsignedInteger(buffer);
            if (readCompressedSize == null) {
                this.log.trace("State [Read Compressed] not enough readable bytes for Compressed Size");
            } else {
                this.compressedSizeRef.set(readCompressedSize);
            }
        }
        return this.compressedSizeRef.get();
    }

    private Integer readUnsignedInteger(ByteBuf buffer) {
        Integer number;
        int readableBytes = buffer.readableBytes();
        if (readableBytes >= 4) {
            long unsigned = buffer.readUnsignedInt();
            number = Math.toIntExact(unsigned);
        } else {
            number = null;
        }
        return number;
    }

    private FrameType readFrameType(ByteBuf buffer) {
        if (this.frameTypeRef.get() == null) {
            int readableBytes = buffer.readableBytes();
            if (readableBytes >= 1) {
                byte frameTypeCode = buffer.readByte();
                FrameType frameType = FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode);
                this.frameTypeRef.set(frameType);
            } else {
                this.log.trace("State [Read Frame Type] not enough readable bytes [{}]", new Object[]{readableBytes});
            }
        }
        return this.frameTypeRef.get();
    }

    private ProtocolVersion readVersion(ByteBuf buffer) {
        if (this.versionRef.get() == null) {
            int readableBytes = buffer.readableBytes();
            if (readableBytes >= 1) {
                byte versionCode = buffer.readByte();
                ProtocolVersion protocolVersion = VERSION_DECODER.readProtocolCode(versionCode);
                this.versionRef.set(protocolVersion);
            } else {
                this.log.trace("State [Read Version] not enough readable bytes [{}]", new Object[]{readableBytes});
            }
        }
        return this.versionRef.get();
    }

    private void resetSequenceVersionPayloadSize() {
        this.sequenceNumberRef.set(null);
        this.payloadSizeRef.set(null);
    }

    private void resetFrameTypeVersion() {
        this.frameTypeRef.set(null);
        this.versionRef.set(null);
    }

    private void resetWindowSize() {
        this.windowSize.set(1);
        this.batchMessages.clear();
    }
}

