package io.confluent.kafkarest.resources.v3;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.controllers.ProduceController;
import io.confluent.kafkarest.controllers.RecordSerializer;
import io.confluent.kafkarest.controllers.SchemaManager;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceResult;
import io.confluent.kafkarest.entities.v3.ProduceBatchRequest;
import io.confluent.kafkarest.entities.v3.ProduceBatchRequestEntry;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponse;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponseFailureEntry;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponseSuccessEntry;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.kafkarest.ratelimit.RequestRateLimiter;
import io.confluent.kafkarest.response.FakeAsyncResponse;
import io.confluent.rest.exceptions.RestConstraintViolationException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import javax.inject.Provider;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.metrics.Metrics;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafkarest/resources/v3/ProduceBatchActionTest.class */
public class ProduceBatchActionTest {

    @Mock
    private ProduceController produceController;
    private ProduceBatchAction produceBatchAction;
    private KafkaRestConfig kafkaRestConfig;

    @BeforeEach
    public void setUp() {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", Integer.toString(10000));
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(999999999));
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", Integer.toString(3600000));
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter).anyTimes();
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter2).anyTimes();
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter3).anyTimes();
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter4).anyTimes();
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andVoid().anyTimes();
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andVoid().anyTimes();
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andVoid().anyTimes();
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andVoid().anyTimes();
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter, requestRateLimiter2, requestRateLimiter3, requestRateLimiter4, provider3, provider4});
        ProduceRateLimiters produceRateLimiters = new ProduceRateLimiters(provider, provider2, provider3, provider4, Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("api.v3.produce.rate.limit.enabled"))), Duration.ofMillis(Integer.parseInt(properties.getProperty("api.v3.produce.rate.limit.cache.expiry.ms"))));
        Provider provider5 = (Provider) EasyMock.mock(Provider.class);
        SchemaManager schemaManager = (SchemaManager) EasyMock.mock(SchemaManager.class);
        EasyMock.expect(provider5.get()).andReturn(schemaManager).anyTimes();
        EasyMock.expect(schemaManager.getSchema("topicName", Optional.of(EmbeddedFormat.AVRO), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of("bob"), true)).andThrow(Errors.invalidPayloadException("Schema Registry must be configured when using schemas."));
        EasyMock.replay(new Object[]{provider5, schemaManager});
        Provider<RecordSerializer> recordSerializerProvider = getRecordSerializerProvider(false);
        Provider provider6 = (Provider) EasyMock.mock(Provider.class);
        this.produceController = (ProduceController) EasyMock.mock(ProduceController.class);
        EasyMock.expect(provider6.get()).andReturn(this.produceController).anyTimes();
        EasyMock.replay(new Object[]{provider6});
        ListeningExecutorService newDirectExecutorService = MoreExecutors.newDirectExecutorService();
        this.kafkaRestConfig = new KafkaRestConfig();
        this.kafkaRestConfig.setMetrics(new Metrics());
        this.produceBatchAction = new ProduceBatchAction(provider5, recordSerializerProvider, provider6, () -> {
            return new ProducerMetrics(this.kafkaRestConfig, Collections.emptyMap());
        }, produceRateLimiters, Integer.valueOf("10"), newDirectExecutorService);
        produceRateLimiters.clear();
    }

    @AfterAll
    public static void cleanUp() {
        try {
            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName("kafka.rest:type=produce-api-metrics"));
        } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testProduceNoSchemaRegistryDefined() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new RestConstraintViolationException("Payload error. Schema Registry must be configured when using schemas.", 42206));
        EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(completableFuture);
        EasyMock.replay(new Object[]{this.produceController});
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(Collections.singletonList(ProduceBatchRequestEntry.builder().setId(new TextNode("1")).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setData(TextNode.valueOf("bob")).setRawSchema("bob").build()).build())).build();
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(Collections.emptyList()).setFailures(Collections.singletonList(ProduceBatchResponseFailureEntry.builder().setId("1").setErrorCode(422).setMessage("Error: 42206 : Payload error. Schema Registry must be configured when using schemas.").build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        Assertions.assertEquals(build2, fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceNegativePartitionId() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(Errors.partitionNotFoundException());
        EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(completableFuture);
        EasyMock.replay(new Object[]{this.produceController});
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(Collections.singletonList(ProduceBatchRequestEntry.builder().setId(new TextNode("1")).setPartitionId(-1).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setData(TextNode.valueOf("bob")).setRawSchema("bob").build()).build())).build();
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(Collections.emptyList()).setFailures(Collections.singletonList(ProduceBatchResponseFailureEntry.builder().setId("1").setErrorCode(40402).setMessage("Partition not found.").build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        Assertions.assertEquals(build2, fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceBatch() throws Exception {
        for (int i = 0; i < 4; i++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(ProduceResult.create(0, i, Instant.now(), 25, 0, Instant.now()));
            EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(completableFuture);
        }
        EasyMock.replay(new Object[]{this.produceController});
        ArrayList arrayList = new ArrayList(4);
        ArrayList arrayList2 = new ArrayList(4);
        for (int i2 = 0; i2 < 4; i2++) {
            arrayList.add(ProduceBatchRequestEntry.builder().setId(new TextNode(Integer.toString(i2))).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).build());
            arrayList2.add(ProduceBatchResponseSuccessEntry.builder().setId(Integer.toString(i2)).setClusterId("clusterId").setTopicName("topicName").setPartitionId(0).setOffset(Long.valueOf(i2)).setKey(ProduceResponse.ProduceResponseData.builder().setSize(25).build()).build());
        }
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(arrayList).build();
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(arrayList2).setFailures(Collections.emptyList()).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        assertProduceBatchResponse(build2, (ProduceBatchResponse) fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceBatchNonnumericIds() throws Exception {
        EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(CompletableFuture.completedFuture(ProduceResult.create(0, 0L, Instant.now(), 25, 0, Instant.now()))).andReturn(CompletableFuture.completedFuture(ProduceResult.create(0, 1L, Instant.now(), 25, 0, Instant.now())));
        EasyMock.replay(new Object[]{this.produceController});
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(ImmutableList.of(ProduceBatchRequestEntry.builder().setId(TextNode.valueOf("entry-1")).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf("0-9_a-z_A-Z")).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(ImmutableList.of(ProduceBatchResponseSuccessEntry.builder().setId("entry-1").setClusterId("clusterId").setTopicName("topicName").setPartitionId(0).setOffset(0L).setKey(ProduceResponse.ProduceResponseData.builder().setSize(25).build()).build(), ProduceBatchResponseSuccessEntry.builder().setId("0-9_a-z_A-Z").setClusterId("clusterId").setTopicName("topicName").setPartitionId(0).setOffset(1L).setKey(ProduceResponse.ProduceResponseData.builder().setSize(25).build()).build())).setFailures(Collections.emptyList()).build();
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        assertProduceBatchResponse(build2, (ProduceBatchResponse) fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceBatchNullRequest() throws Exception {
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", (ProduceBatchRequest) null);
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Request body is empty. Data is required.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchEmptyRequest() throws Exception {
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(Collections.emptyList()).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Empty batch.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchOversizeRequest() throws Exception {
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(ImmutableList.of(ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(0))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(1))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(2))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(3))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(4))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(5))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(6))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(7))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(8))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(9))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(10))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Too many entries in batch.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchWithDuplicateId() throws Exception {
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(ImmutableList.of(ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(0))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build(), ProduceBatchRequestEntry.builder().setId(TextNode.valueOf(Integer.toString(0))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry IDs are not distinct.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchFailed() throws Exception {
        for (int i = 0; i < 4; i++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new BadRequestException("Invalid base64 string"));
            EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(completableFuture);
        }
        EasyMock.replay(new Object[]{this.produceController});
        ArrayList arrayList = new ArrayList(4);
        ArrayList arrayList2 = new ArrayList(4);
        for (int i2 = 0; i2 < 4; i2++) {
            arrayList.add(ProduceBatchRequestEntry.builder().setId(new TextNode(Integer.toString(i2))).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(TextNode.valueOf("fooba")).build()).build());
            arrayList2.add(ProduceBatchResponseFailureEntry.builder().setId(Integer.toString(i2)).setErrorCode(400).setMessage("Bad Request: Invalid base64 string").build());
        }
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(arrayList).build();
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(Collections.emptyList()).setFailures(arrayList2).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        assertProduceBatchResponse(build2, (ProduceBatchResponse) fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceBatchPartialFailure() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new BadRequestException("Invalid base64 string"));
        EasyMock.expect(this.produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(CompletableFuture.completedFuture(ProduceResult.create(0, 0L, Instant.now(), 25, 0, Instant.now()))).andReturn(completableFuture);
        EasyMock.replay(new Object[]{this.produceController});
        ProduceBatchRequest build = ProduceBatchRequest.builder().setEntries(ImmutableList.of(ProduceBatchRequestEntry.builder().setId(new TextNode("0")).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).build(), ProduceBatchRequestEntry.builder().setId(new TextNode("1")).setOriginalSize(25L).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(TextNode.valueOf("fooba")).build()).build())).build();
        ProduceBatchResponse build2 = ProduceBatchResponse.builder().setSuccesses(Collections.singletonList(ProduceBatchResponseSuccessEntry.builder().setId("0").setClusterId("clusterId").setTopicName("topicName").setPartitionId(0).setOffset(0L).setKey(ProduceResponse.ProduceResponseData.builder().setSize(25).build()).build())).setFailures(Collections.singletonList(ProduceBatchResponseFailureEntry.builder().setId("1").setErrorCode(400).setMessage("Bad Request: Invalid base64 string").build())).build();
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        this.produceBatchAction.produce(fakeAsyncResponse, "clusterId", "topicName", build);
        Assertions.assertEquals(207, fakeAsyncResponse.getStatus());
        assertProduceBatchResponse(build2, (ProduceBatchResponse) fakeAsyncResponse.getValue());
        Assertions.assertNull(fakeAsyncResponse.getException());
    }

    @Test
    public void testProduceBatchNullIdFailure() throws Exception {
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            ProduceBatchRequestEntry.builder().setId(NullNode.instance).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build();
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry identifier is not a valid string.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchIntegerIdFailure() throws Exception {
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            ProduceBatchRequestEntry.builder().setId(IntNode.valueOf(0)).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build();
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry identifier is not a valid string.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchEmptyStringIdFailure() throws Exception {
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            ProduceBatchRequestEntry.builder().setId(TextNode.valueOf("")).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build();
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry identifier is not a valid string.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchLongStringIdFailure() throws Exception {
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            ProduceBatchRequestEntry.builder().setId(TextNode.valueOf("012345678901234567890123456789012345678901234567890123456789012345678901234567890")).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build();
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry identifier is not a valid string.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    @Test
    public void testProduceBatchInvalidStringIdFailure() throws Exception {
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            ProduceBatchRequestEntry.builder().setId(TextNode.valueOf("€#£")).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bob")).build()).setOriginalSize(25L).build();
        });
        Assertions.assertEquals(42208, assertThrows.getErrorCode());
        Assertions.assertEquals("Batch entry identifier is not a valid string.", assertThrows.getMessage());
        Assertions.assertEquals(422, assertThrows.getStatus());
    }

    private static Provider<RecordSerializer> getRecordSerializerProvider(boolean z) {
        Provider<RecordSerializer> provider = (Provider) EasyMock.mock(Provider.class);
        RecordSerializer recordSerializer = (RecordSerializer) EasyMock.mock(RecordSerializer.class);
        EasyMock.expect(provider.get()).andReturn(recordSerializer).anyTimes();
        if (z) {
            EasyMock.expect(recordSerializer.serialize((EmbeddedFormat) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (JsonNode) EasyMock.anyObject(), EasyMock.anyBoolean())).andThrow(Errors.messageSerializationException("Schema Registry not defined, no Schema Registry client available to deserialize message.")).anyTimes();
        } else {
            EasyMock.expect(recordSerializer.serialize((EmbeddedFormat) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (JsonNode) EasyMock.anyObject(), EasyMock.anyBoolean())).andReturn(Optional.empty()).anyTimes();
        }
        EasyMock.replay(new Object[]{provider, recordSerializer});
        EasyMock.verify(new Object[]{recordSerializer});
        return provider;
    }

    private void assertProduceBatchResponse(ProduceBatchResponse produceBatchResponse, ProduceBatchResponse produceBatchResponse2) {
        int size = produceBatchResponse2.getSuccesses().size();
        Assertions.assertEquals(produceBatchResponse.getSuccesses().size(), produceBatchResponse2.getSuccesses().size());
        for (int i = 0; i < size; i++) {
            ProduceBatchResponseSuccessEntry produceBatchResponseSuccessEntry = (ProduceBatchResponseSuccessEntry) produceBatchResponse.getSuccesses().get(i);
            ProduceBatchResponseSuccessEntry produceBatchResponseSuccessEntry2 = (ProduceBatchResponseSuccessEntry) produceBatchResponse2.getSuccesses().get(i);
            Assertions.assertEquals(produceBatchResponseSuccessEntry.getId(), produceBatchResponseSuccessEntry2.getId());
            Assertions.assertEquals(produceBatchResponseSuccessEntry.getClusterId(), produceBatchResponseSuccessEntry2.getClusterId());
            Assertions.assertEquals(produceBatchResponseSuccessEntry.getTopicName(), produceBatchResponseSuccessEntry2.getTopicName());
            Assertions.assertEquals(produceBatchResponseSuccessEntry.getPartitionId(), produceBatchResponseSuccessEntry2.getPartitionId());
            Assertions.assertEquals(produceBatchResponseSuccessEntry.getOffset(), produceBatchResponseSuccessEntry2.getOffset());
        }
        int size2 = produceBatchResponse2.getFailures().size();
        Assertions.assertEquals(produceBatchResponse.getFailures().size(), produceBatchResponse2.getFailures().size());
        for (int i2 = 0; i2 < size2; i2++) {
            ProduceBatchResponseFailureEntry produceBatchResponseFailureEntry = (ProduceBatchResponseFailureEntry) produceBatchResponse.getFailures().get(i2);
            ProduceBatchResponseFailureEntry produceBatchResponseFailureEntry2 = (ProduceBatchResponseFailureEntry) produceBatchResponse2.getFailures().get(i2);
            Assertions.assertEquals(produceBatchResponseFailureEntry.getId(), produceBatchResponseFailureEntry2.getId());
            Assertions.assertEquals(produceBatchResponseFailureEntry.getErrorCode(), produceBatchResponseFailureEntry2.getErrorCode());
            Assertions.assertEquals(produceBatchResponseFailureEntry.getMessage(), produceBatchResponseFailureEntry2.getMessage());
        }
    }
}
