/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.beats.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.MessageAck;

@ChannelHandler.Sharable
public class BatchChannelInboundHandler
extends SimpleChannelInboundHandler<Batch> {
    private final ComponentLog log;
    private final BlockingQueue<BatchMessage> messages;

    public BatchChannelInboundHandler(ComponentLog log, BlockingQueue<BatchMessage> messages) {
        this.log = Objects.requireNonNull(log, "Component Log required");
        this.messages = Objects.requireNonNull(messages, "Message Queue required");
    }

    protected void channelRead0(ChannelHandlerContext context, Batch batch) {
        Integer lastSequenceNumber = null;
        Collection<BatchMessage> batchMessages = batch.getMessages();
        int queued = 0;
        for (BatchMessage batchMessage : batchMessages) {
            int sequenceNumber = batchMessage.getSequenceNumber();
            String sender = batchMessage.getSender();
            if (this.messages.offer(batchMessage)) {
                this.log.debug("Message Sequence Number [{}] Sender [{}] queued", new Object[]{sequenceNumber, sender});
                lastSequenceNumber = batchMessage.getSequenceNumber();
                ++queued;
                continue;
            }
            this.log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", new Object[]{sequenceNumber, sender, queued, batchMessages.size()});
            break;
        }
        if (lastSequenceNumber == null) {
            this.log.warn("Batch Messages [{}] queuing failed", new Object[]{batch.getMessages().size()});
        } else {
            MessageAck messageAck = new MessageAck(lastSequenceNumber);
            context.writeAndFlush((Object)messageAck);
        }
    }
}

