package co.cask.cdap.messaging.server;

import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.Schemas;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import com.google.inject.Inject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;

@Path("/v1/namespaces/{namespace}/topics/{topic}")
/* loaded from: input_file:co/cask/cdap/messaging/server/StoreHandler.class */
public final class StoreHandler extends AbstractHttpHandler {
    private final MessagingService messagingService;

    /* loaded from: input_file:co/cask/cdap/messaging/server/StoreHandler$GenericRecordRollbackDetail.class */
    private static final class GenericRecordRollbackDetail implements RollbackDetail {
        private final GenericRecord record;

        private GenericRecordRollbackDetail(GenericRecord genericRecord) {
            this.record = genericRecord;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getTransactionWritePointer() {
            return ((Long) this.record.get("transactionWritePointer")).longValue();
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getStartTimestamp() {
            return ((Long) ((GenericRecord) this.record.get("rollbackRange")).get("startTimestamp")).longValue();
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public int getStartSequenceId() {
            return ((Integer) ((GenericRecord) this.record.get("rollbackRange")).get("startSequenceId")).intValue();
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getEndTimestamp() {
            return ((Long) ((GenericRecord) this.record.get("rollbackRange")).get("endTimestamp")).longValue();
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public int getEndSequenceId() {
            return ((Integer) ((GenericRecord) this.record.get("rollbackRange")).get("endSequenceId")).intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/server/StoreHandler$GenericRecordStoreRequest.class */
    public static final class GenericRecordStoreRequest extends StoreRequest {
        private final List<ByteBuffer> payloads;

        GenericRecordStoreRequest(TopicId topicId, GenericRecord genericRecord) {
            super(topicId, genericRecord.get("transactionWritePointer") != null, genericRecord.get("transactionWritePointer") == null ? -1L : Long.parseLong(genericRecord.get("transactionWritePointer").toString()));
            this.payloads = (List) genericRecord.get("messages");
        }

        @Override // co.cask.cdap.messaging.StoreRequest
        public boolean hasPayload() {
            return !this.payloads.isEmpty();
        }

        @Override // java.lang.Iterable
        public Iterator<byte[]> iterator() {
            return this.payloads.stream().map(ByteBuffers::getByteArray).iterator();
        }
    }

    @Inject
    StoreHandler(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    @POST
    @Path("/publish")
    public void publish(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("namespace") String str, @PathParam("topic") String str2) throws Exception {
        TopicId topicId = new NamespaceId(str).topic(str2);
        StoreRequest createStoreRequest = createStoreRequest(topicId, fullHttpRequest);
        if (!createStoreRequest.isTransactional() && !createStoreRequest.hasPayload()) {
            throw new BadRequestException("Empty payload is only allowed for publishing transactional message. Topic: " + topicId);
        }
        RollbackDetail publish = this.messagingService.publish(createStoreRequest);
        if (publish == null) {
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } else {
            httpResponder.sendContent(HttpResponseStatus.OK, encodeRollbackDetail(publish), new DefaultHttpHeaders().set(HttpHeaderNames.CONTENT_TYPE, "avro/binary"));
        }
    }

    @POST
    @Path("/store")
    public void store(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("namespace") String str, @PathParam("topic") String str2) throws Exception {
        TopicId topicId = new NamespaceId(str).topic(str2);
        StoreRequest createStoreRequest = createStoreRequest(topicId, fullHttpRequest);
        if (!createStoreRequest.isTransactional() || !createStoreRequest.hasPayload()) {
            throw new BadRequestException("Store request must be transactional with payload. Topic: " + topicId);
        }
        this.messagingService.storePayload(createStoreRequest);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @POST
    @Path("/rollback")
    public void rollback(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("namespace") String str, @PathParam("topic") String str2) throws Exception {
        TopicId topicId = new NamespaceId(str).topic(str2);
        BinaryDecoder directBinaryDecoder = DecoderFactory.get().directBinaryDecoder(new ByteBufInputStream(fullHttpRequest.content()), (BinaryDecoder) null);
        this.messagingService.rollback(topicId, new GenericRecordRollbackDetail((GenericRecord) new GenericDatumReader(Schemas.V1.PublishResponse.SCHEMA).read((Object) null, directBinaryDecoder)));
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    private StoreRequest createStoreRequest(TopicId topicId, FullHttpRequest fullHttpRequest) throws Exception {
        if (!"avro/binary".equals(fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_TYPE))) {
            throw new BadRequestException("Only avro/binary content type is supported.");
        }
        return new GenericRecordStoreRequest(topicId, (GenericRecord) new GenericDatumReader(Schemas.V1.PublishRequest.SCHEMA).read((Object) null, DecoderFactory.get().directBinaryDecoder(new ByteBufInputStream(fullHttpRequest.content()), (BinaryDecoder) null)));
    }

    private ByteBuf encodeRollbackDetail(RollbackDetail rollbackDetail) throws IOException {
        Schema schema = Schemas.V1.PublishResponse.SCHEMA;
        GenericData.Record record = new GenericData.Record(schema);
        record.put("transactionWritePointer", Long.valueOf(rollbackDetail.getTransactionWritePointer()));
        GenericData.Record record2 = new GenericData.Record(schema.getField("rollbackRange").schema());
        record2.put("startTimestamp", Long.valueOf(rollbackDetail.getStartTimestamp()));
        record2.put("startSequenceId", Integer.valueOf(rollbackDetail.getStartSequenceId()));
        record2.put("endTimestamp", Long.valueOf(rollbackDetail.getEndTimestamp()));
        record2.put("endSequenceId", Integer.valueOf(rollbackDetail.getEndSequenceId()));
        record.put("rollbackRange", record2);
        ByteBuf buffer = Unpooled.buffer(38);
        new GenericDatumWriter(schema).write(record, EncoderFactory.get().directBinaryEncoder(new ByteBufOutputStream(buffer), (BinaryEncoder) null));
        return buffer;
    }
}
