package co.cask.cdap.messaging.server;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.Schemas;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.BodyProducer;
import co.cask.http.HttpResponder;
import com.google.common.collect.ImmutableMultimap;
import com.google.inject.Inject;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.tephra.TransactionCodec;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/v1/namespaces/{namespace}/topics/{topic}")
/* loaded from: input_file:co/cask/cdap/messaging/server/FetchHandler.class */
public final class FetchHandler extends AbstractHttpHandler {
    private static final Logger LOG = LoggerFactory.getLogger(FetchHandler.class);
    private static final TransactionCodec TRANSACTION_CODEC = new TransactionCodec();
    private final MessagingService messagingService;
    private int messageChunkSize;

    /* loaded from: input_file:co/cask/cdap/messaging/server/FetchHandler$MessagesBodyProducer.class */
    private static class MessagesBodyProducer extends BodyProducer {
        private final CloseableIterator<RawMessage> iterator;
        private final int messageChunkSize;
        private final ChannelBuffer chunk;
        private final Encoder encoder;
        private boolean arrayStarted;
        private boolean arrayEnded;
        private final List<RawMessage> messages = new ArrayList();
        private final GenericRecord messageRecord = new GenericData.Record(Schemas.V1.ConsumeResponse.SCHEMA.getElementType());
        private final DatumWriter<GenericRecord> messageWriter = new GenericDatumWriter<GenericRecord>(Schemas.V1.ConsumeResponse.SCHEMA.getElementType()) { // from class: co.cask.cdap.messaging.server.FetchHandler.MessagesBodyProducer.1
            protected void writeBytes(Object obj, Encoder encoder) throws IOException {
                if (obj instanceof byte[]) {
                    encoder.writeBytes((byte[]) obj);
                } else {
                    super.writeBytes(obj, encoder);
                }
            }
        };

        MessagesBodyProducer(CloseableIterator<RawMessage> closeableIterator, int i) {
            this.iterator = closeableIterator;
            this.messageChunkSize = i;
            this.chunk = ChannelBuffers.dynamicBuffer(i);
            this.encoder = EncoderFactory.get().directBinaryEncoder(new ChannelBufferOutputStream(this.chunk), (BinaryEncoder) null);
        }

        public ChannelBuffer nextChunk() throws Exception {
            if (this.arrayEnded) {
                return ChannelBuffers.EMPTY_BUFFER;
            }
            this.chunk.clear();
            if (!this.arrayStarted) {
                this.arrayStarted = true;
                this.encoder.writeArrayStart();
            }
            int i = 0;
            this.messages.clear();
            while (this.iterator.hasNext() && i < this.messageChunkSize) {
                RawMessage rawMessage = (RawMessage) this.iterator.next();
                this.messages.add(rawMessage);
                i += rawMessage.getId().length + rawMessage.getPayload().length + 8;
            }
            this.encoder.setItemCount(this.messages.size());
            for (RawMessage rawMessage2 : this.messages) {
                this.encoder.startItem();
                this.messageRecord.put("id", rawMessage2.getId());
                this.messageRecord.put("payload", rawMessage2.getPayload());
                this.messageWriter.write(this.messageRecord, this.encoder);
            }
            if (!this.iterator.hasNext()) {
                this.arrayEnded = true;
                this.encoder.writeArrayEnd();
            }
            return this.chunk;
        }

        public void finished() throws Exception {
            this.iterator.close();
        }

        public void handleError(@Nullable Throwable th) {
            this.iterator.close();
            if (th instanceof SocketException) {
                FetchHandler.LOG.debug("Socket exception raised when sending messages back to client", th);
            } else {
                FetchHandler.LOG.warn("Exception raised when sending messages back to client", th);
            }
        }
    }

    @Inject
    FetchHandler(CConfiguration cConfiguration, MessagingService messagingService) {
        this.messagingService = messagingService;
        this.messageChunkSize = cConfiguration.getInt("messaging.http.server.consume.chunk.size");
    }

    @POST
    @Path("poll")
    public void poll(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace") String str, @PathParam("topic") String str2) throws Exception {
        TopicId topicId = new NamespaceId(str).topic(str2);
        if (!"avro/binary".equals(httpRequest.getHeader("Content-Type"))) {
            throw new BadRequestException("Only avro/binary content type is supported.");
        }
        CloseableIterator<RawMessage> fetchMessages = fetchMessages((GenericRecord) new GenericDatumReader(Schemas.V1.ConsumeRequest.SCHEMA).read((Object) null, DecoderFactory.get().directBinaryDecoder(new ChannelBufferInputStream(httpRequest.getContent()), (BinaryDecoder) null)), topicId);
        try {
            httpResponder.sendContent(HttpResponseStatus.OK, new MessagesBodyProducer(fetchMessages, this.messageChunkSize), ImmutableMultimap.of("Content-Type", "avro/binary"));
        } catch (Throwable th) {
            fetchMessages.close();
            throw th;
        }
    }

    private CloseableIterator<RawMessage> fetchMessages(GenericRecord genericRecord, TopicId topicId) throws IOException, TopicNotFoundException {
        MessageFetcher prepareFetch = this.messagingService.prepareFetch(topicId);
        Object obj = genericRecord.get("startFrom");
        if (obj != null) {
            if (obj instanceof ByteBuffer) {
                prepareFetch.setStartMessage(Bytes.toBytes((ByteBuffer) obj), ((Boolean) genericRecord.get("inclusive")).booleanValue());
            } else if (obj instanceof Long) {
                prepareFetch.setStartTime(((Long) obj).longValue());
            } else {
                LOG.warn("Ignore unrecognized type for startFrom. Type={}, Value={}", obj.getClass(), obj);
            }
        }
        Integer num = (Integer) genericRecord.get("limit");
        if (num != null) {
            prepareFetch.setLimit(num.intValue());
        }
        ByteBuffer byteBuffer = (ByteBuffer) genericRecord.get("transaction");
        if (byteBuffer != null) {
            prepareFetch.setTransaction(TRANSACTION_CODEC.decode(ByteBuffers.getByteArray(byteBuffer)));
        }
        return prepareFetch.fetch();
    }
}
