package co.cask.cdap.internal.app.runtime.monitor;

import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.internal.app.runtime.monitor.MonitorSchemas;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.http.BodyProducer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/MessagesBodyProducer.class */
final class MessagesBodyProducer extends BodyProducer {
    private static final Logger LOG = LoggerFactory.getLogger(MessagesBodyProducer.class);
    private static final Logger OUTAGE_LOG = Loggers.sampling(LOG, LogSamplers.perMessage(() -> {
        return LogSamplers.limitRate(60000L);
    }));
    private final Iterator<Map.Entry<String, GenericRecord>> requestsIterator;
    private final MessagingContext messagingContext;
    private final CConfiguration cConf;
    private final int numOfRequests;
    private final int messageChunkSize;
    private final ByteBuf chunk;
    private final Encoder encoder;
    private Iterator<Message> iterator;
    private boolean mapStarted;
    private boolean mapEnded;
    private final Schema elementSchema = MonitorSchemas.V1.MonitorResponse.SCHEMA.getValueType().getElementType();
    private final Deque<GenericRecord> monitorMessages = new LinkedList();
    private final DatumWriter<GenericRecord> messageWriter = new GenericDatumWriter<GenericRecord>(this.elementSchema) { // from class: co.cask.cdap.internal.app.runtime.monitor.MessagesBodyProducer.1
        protected void writeBytes(Object obj, Encoder encoder) throws IOException {
            if (obj instanceof byte[]) {
                encoder.writeBytes((byte[]) obj);
            } else {
                super.writeBytes(obj, encoder);
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagesBodyProducer(CConfiguration cConfiguration, Map<String, GenericRecord> map, MessagingContext messagingContext) {
        this.requestsIterator = map.entrySet().iterator();
        this.messagingContext = messagingContext;
        this.cConf = cConfiguration;
        this.numOfRequests = map.size();
        this.messageChunkSize = cConfiguration.getInt("app.program.runtime.monitor.server.consume.chunk.size");
        this.chunk = Unpooled.buffer(this.messageChunkSize);
        this.encoder = EncoderFactory.get().directBinaryEncoder(new ByteBufOutputStream(this.chunk), (BinaryEncoder) null);
    }

    public ByteBuf nextChunk() throws Exception {
        if (this.mapEnded) {
            return Unpooled.EMPTY_BUFFER;
        }
        this.chunk.clear();
        this.iterator = getIterator();
        if (this.iterator != null && this.iterator.hasNext()) {
            sendMessages();
        }
        return this.chunk.copy();
    }

    public void finished() {
        this.chunk.release();
    }

    public void handleError(@Nullable Throwable th) {
        closeIterator(this.iterator);
        OUTAGE_LOG.error("Error occurred while sending chunks from Runtime Handler", th);
    }

    private Iterator<Message> getIterator() throws IOException {
        if (!this.mapStarted) {
            startMap();
        }
        if (this.iterator == null || !this.iterator.hasNext()) {
            if (this.iterator != null) {
                endArray();
            }
            if (this.requestsIterator.hasNext()) {
                Map.Entry<String, GenericRecord> next = this.requestsIterator.next();
                startArray(next.getKey());
                this.iterator = fetchMessages(next);
            } else {
                endMap();
            }
        }
        return this.iterator;
    }

    private Iterator<Message> fetchMessages(Map.Entry<String, GenericRecord> entry) throws IOException {
        String topic = getTopic(entry.getKey());
        if (topic == null) {
            OUTAGE_LOG.warn("Ignoring topic config '{}' in the request that does not map to any local topic", entry.getKey());
            return Collections.emptyIterator();
        }
        try {
            return this.messagingContext.getMessageFetcher().fetch(NamespaceId.SYSTEM.getNamespace(), topic, ((Integer) entry.getValue().get("limit")).intValue(), entry.getValue().get("messageId") == null ? null : entry.getValue().get("messageId").toString());
        } catch (TopicNotFoundException e) {
            OUTAGE_LOG.warn("Ignoring topic '{}:{}' that does not exists locally", e.getNamespace(), e.getTopic());
            return Collections.emptyIterator();
        }
    }

    private void bufferMessagesToSend() {
        int i = 0;
        while (true) {
            int i2 = i;
            if (!this.iterator.hasNext() || i2 >= this.messageChunkSize) {
                return;
            }
            Message next = this.iterator.next();
            this.monitorMessages.addLast(createGenericRecord(next));
            i = i2 + next.getId().length() + next.getPayload().length + 8;
        }
    }

    private void startMap() throws IOException {
        this.encoder.writeMapStart();
        this.encoder.setItemCount(this.numOfRequests);
        this.mapStarted = true;
    }

    private void startArray(String str) throws IOException {
        this.encoder.startItem();
        this.encoder.writeString(str);
        this.encoder.writeArrayStart();
    }

    private void endArray() throws IOException {
        closeIterator(this.iterator);
        this.encoder.writeArrayEnd();
    }

    private void endMap() throws IOException {
        this.encoder.writeMapEnd();
        this.mapEnded = true;
    }

    private void sendMessages() throws IOException {
        bufferMessagesToSend();
        this.encoder.setItemCount(this.monitorMessages.size());
        for (GenericRecord genericRecord : this.monitorMessages) {
            this.encoder.startItem();
            this.messageWriter.write(genericRecord, this.encoder);
        }
        this.monitorMessages.clear();
    }

    private GenericRecord createGenericRecord(Message message) {
        GenericData.Record record = new GenericData.Record(this.elementSchema);
        record.put("messageId", message.getId());
        record.put("message", message.getPayload());
        return record;
    }

    @Nullable
    private String getTopic(String str) {
        int lastIndexOf = str.lastIndexOf(58);
        if (lastIndexOf < 0) {
            return this.cConf.get(str);
        }
        String str2 = this.cConf.get(str.substring(0, lastIndexOf));
        if (str2 == null) {
            return null;
        }
        return str2 + str.substring(lastIndexOf + 1);
    }

    private void closeIterator(Iterator<?> it) {
        if (it instanceof AutoCloseable) {
            try {
                ((AutoCloseable) it).close();
            } catch (Exception e) {
                LOG.warn("Exception raised when closing iterator", e);
            }
        }
    }
}
