package io.confluent.kafkarest.integration.v3;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ProduceActionNoSchemaIntegrationTest.class */
public class ProduceActionNoSchemaIntegrationTest extends ClusterTestHarness {
    private static final String TOPIC_NAME = "topic-1";
    private static final int NUM_PARTITIONS = 3;

    public ProduceActionNoSchemaIntegrationTest() {
        super(1, false);
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        createTopic(TOPIC_NAME, NUM_PARTITIONS, (short) 1);
    }

    @Test
    public void produceBinary() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) message.value()));
    }

    @Test
    public void produceBinaryWithNullData() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(message.key());
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceBinaryWithInvalidData_throwsBadRequest() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(TextNode.valueOf("fooba")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceString() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceStringCharsetUtf8() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json; charset=UTF-8"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceStringCharsetIso88591() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json; charset=ISO-8859-1"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceStringCharsetInvalid() throws Exception {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json; charset=DEADBEEF")).getStatus());
    }

    @Test
    public void produceStringWithEmptyData() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertTrue(((String) message.key()).isEmpty());
        Assertions.assertTrue(((String) message.value()).isEmpty());
    }

    @Test
    public void produceStringWithNullData() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertNull(message.key());
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceWithInvalidData_throwsBadRequest() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity("{ \"records\": {\"subject\": \"foobar\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ErrorResponse errorResponse = (ErrorResponse) post.readEntity(ErrorResponse.class);
        Assertions.assertEquals(400, errorResponse.getErrorCode());
        Assertions.assertEquals("Unrecognized field \"records\" (class io.confluent.kafkarest.entities.v3.AutoValue_ProduceRequest$Builder), not marked as ignorable (6 known properties: \"value\", \"originalSize\", \"partitionId\", \"headers\", \"key\", \"timestamp\"])", errorResponse.getMessage());
    }

    @Test
    public void produceJson() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceJsonWithNullData() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assertions.assertNull(message.key());
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceBinaryWithPartitionId() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setPartitionId(1).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ConsumerRecord message = getMessage(TOPIC_NAME, 1, readProduceResponse(post).getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) message.value()));
    }

    @Test
    public void produceBinaryWithTimestamp() throws Exception {
        String clusterId = getClusterId();
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setTimestamp(ofEpochMilli).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) message.value()));
        Assertions.assertEquals(ofEpochMilli, Instant.ofEpochMilli(message.timestamp()));
    }

    @Test
    public void produceBinaryWithHeaders() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setHeaders(Arrays.asList(ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-1")), ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-2")), ProduceRequest.ProduceRequestHeader.create("header-2", ByteString.copyFromUtf8("value-3")))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) message.value()));
        Assertions.assertEquals(Arrays.asList(new RecordHeader("header-1", ByteString.copyFromUtf8("value-1").toByteArray()), new RecordHeader("header-1", ByteString.copyFromUtf8("value-2").toByteArray())), ImmutableList.copyOf(message.headers().headers("header-1")));
        Assertions.assertEquals(Collections.singletonList(new RecordHeader("header-2", ByteString.copyFromUtf8("value-3").toByteArray())), ImmutableList.copyOf(message.headers().headers("header-2")));
    }

    @Test
    public void produceBinaryKeyOnly() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceBinaryValueOnly() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("bar");
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(message.key());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.value()));
    }

    @Test
    public void produceStringWithPartitionId() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setPartitionId(1).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ConsumerRecord message = getMessage(TOPIC_NAME, 1, readProduceResponse(post).getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceStringWithTimestamp() throws Exception {
        String clusterId = getClusterId();
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setTimestamp(ofEpochMilli).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
        Assertions.assertEquals(ofEpochMilli, Instant.ofEpochMilli(message.timestamp()));
    }

    @Test
    public void produceStringWithHeaders() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setHeaders(Arrays.asList(ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-1")), ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-2")), ProduceRequest.ProduceRequestHeader.create("header-2", ByteString.copyFromUtf8("value-3")))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertEquals("bar", message.value());
        Assertions.assertEquals(Arrays.asList(new RecordHeader("header-1", ByteString.copyFromUtf8("value-1").toByteArray()), new RecordHeader("header-1", ByteString.copyFromUtf8("value-2").toByteArray())), ImmutableList.copyOf(message.headers().headers("header-1")));
        Assertions.assertEquals(Collections.singletonList(new RecordHeader("header-2", ByteString.copyFromUtf8("value-3").toByteArray())), ImmutableList.copyOf(message.headers().headers("header-2")));
    }

    @Test
    public void produceStringKeyOnly() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals("foo", message.key());
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceStringValueOnly() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertNull(message.key());
        Assertions.assertEquals("bar", message.value());
    }

    @Test
    public void produceNothing() throws Exception {
        Response post = request("/v3/clusters/" + getClusterId() + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(message.key());
        Assertions.assertNull(message.value());
    }

    @Test
    public void produceJsonBatch() throws Exception {
        String clusterId = getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setPartitionId(0).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("key-" + i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("value-" + i)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ProduceResponse> readProduceResponses = readProduceResponses(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecords messages = getMessages(TOPIC_NAME, kafkaJsonDeserializer, kafkaJsonDeserializer, 100);
        Iterator it2 = messages.iterator();
        Assertions.assertEquals(100, messages.count());
        for (int i2 = 0; i2 < 100; i2++) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
            Assertions.assertEquals(((ProduceResponse) readProduceResponses.get(i2)).getPartitionId(), consumerRecord.partition());
            Assertions.assertEquals(((ProduceResponse) readProduceResponses.get(i2)).getOffset(), consumerRecord.offset());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getKey().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), consumerRecord.key());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getValue().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), consumerRecord.value());
        }
    }

    @Test
    public void produceStringBatch() throws Exception {
        String clusterId = getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setPartitionId(0).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("key-" + i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("value-" + i)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ProduceResponse> readProduceResponses = readProduceResponses(post);
        StringDeserializer stringDeserializer = new StringDeserializer();
        ConsumerRecords messages = getMessages(TOPIC_NAME, stringDeserializer, stringDeserializer, 100);
        Iterator it2 = messages.iterator();
        Assertions.assertEquals(100, messages.count());
        for (int i2 = 0; i2 < 100; i2++) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
            Assertions.assertEquals(((ProduceResponse) readProduceResponses.get(i2)).getPartitionId(), consumerRecord.partition());
            Assertions.assertEquals(((ProduceResponse) readProduceResponses.get(i2)).getOffset(), consumerRecord.offset());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getKey().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.textValue();
            }).orElse(null), consumerRecord.key());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getValue().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.textValue();
            }).orElse(null), consumerRecord.value());
        }
    }

    @Test
    public void produceBinaryBatchWithInvalidData_throwsMultipleBadRequests() throws Exception {
        String clusterId = getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setPartitionId(0).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(2 * i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf((2 * i) + 1)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ErrorResponse> readErrorResponses = readErrorResponses(post);
        for (int i2 = 0; i2 < 100; i2++) {
            Assertions.assertEquals(400, ((ErrorResponse) readErrorResponses.get(i2)).getErrorCode());
        }
    }

    @Test
    public void produceBinaryWithLargerSizeMessage() throws Exception {
        String clusterId = getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        byte[] generateBinaryData = generateBinaryData(20971420);
        Response post = request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(generateBinaryData)).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        Assertions.assertTrue(readProduceResponse.getValue().isPresent());
        Assertions.assertEquals(20971420, ((ProduceResponse.ProduceResponseData) readProduceResponse.getValue().get()).getSize());
        ConsumerRecord message = getMessage(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) message.key()));
        Assertions.assertEquals(20971420, message.serializedValueSize());
        Assertions.assertEquals(Arrays.toString(generateBinaryData), Arrays.toString((byte[]) message.value()));
    }

    private static ProduceResponse readProduceResponse(Response response) {
        response.bufferEntity();
        try {
            return (ProduceResponse) response.readEntity(ProduceResponse.class);
        } catch (ProcessingException e) {
            throw new RuntimeException(((ErrorResponse) response.readEntity(ErrorResponse.class)).toString(), e);
        }
    }

    private static ImmutableList<ProduceResponse> readProduceResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ProduceResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionNoSchemaIntegrationTest.1
        }));
    }

    private static ImmutableList<ErrorResponse> readErrorResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ErrorResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionNoSchemaIntegrationTest.2
        }));
    }

    private static byte[] generateBinaryData(int i) {
        byte[] bArr = new byte[i];
        Arrays.fill(bArr, (byte) 1);
        return bArr;
    }
}
