package io.confluent.kafkarest.resources.v3;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.controllers.ProduceController;
import io.confluent.kafkarest.controllers.RecordSerializer;
import io.confluent.kafkarest.controllers.SchemaManager;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceResult;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.ratelimit.RateLimitExceededException;
import io.confluent.kafkarest.ratelimit.RequestRateLimiter;
import io.confluent.kafkarest.response.ChunkedOutputFactory;
import io.confluent.kafkarest.response.FakeAsyncResponse;
import io.confluent.kafkarest.response.JsonStream;
import io.confluent.kafkarest.response.StreamingResponse;
import io.confluent.kafkarest.response.StreamingResponseFactory;
import io.confluent.rest.exceptions.RestConstraintViolationException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import javax.inject.Provider;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.metrics.Metrics;
import org.easymock.EasyMock;
import org.glassfish.jersey.server.ChunkedOutput;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafkarest/resources/v3/ProduceActionTest.class */
public class ProduceActionTest {
    private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000);

    @AfterAll
    public static void cleanUp() {
        try {
            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName("kafka.rest:type=produce-api-metrics"));
        } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void produceNoSchemaRegistryDefined() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "100");
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(999999999));
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", "3600000");
        properties.put("schema.registry.url", "");
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = getChunkedOutput(chunkedOutputFactory, 1);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter2);
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter3);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter4);
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter, requestRateLimiter2, requestRateLimiter3, requestRateLimiter4, provider3, provider4});
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 1, provider, provider2, provider3, provider4, true);
        MappingIterator<ProduceRequest> produceRequestsMappingIteratorWithSchemaNeeded = getProduceRequestsMappingIteratorWithSchemaNeeded();
        StreamingResponse.ErrorHolder error = StreamingResponse.ResultOrError.error(ErrorResponse.create(422, "Error: 42206 : Payload error. Schema Registry must be configured when using schemas."));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(error);
        chunkedOutput.close();
        EasyMock.replay(new Object[]{chunkedOutput, chunkedOutputFactory});
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIteratorWithSchemaNeeded;
        }));
        EasyMock.verify(new Object[]{produceRequestsMappingIteratorWithSchemaNeeded});
        EasyMock.verify(new Object[]{chunkedOutput});
    }

    @Test
    public void streamingRequests() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", Integer.toString(10000));
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(999999999));
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", Integer.toString(3600000));
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = getChunkedOutput(chunkedOutputFactory, 1);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter3);
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter4);
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2, provider3, provider4});
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 4, provider, provider2, provider3, provider4);
        MappingIterator<ProduceRequest> streamingProduceRequestsMappingIterator = getStreamingProduceRequestsMappingIterator(4);
        StreamingResponse.ResultHolder result = StreamingResponse.ResultOrError.result(getProduceResponse(0));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result);
        StreamingResponse.ResultHolder result2 = StreamingResponse.ResultOrError.result(getProduceResponse(1));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result2);
        StreamingResponse.ResultHolder result3 = StreamingResponse.ResultOrError.result(getProduceResponse(2));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result3);
        StreamingResponse.ResultHolder result4 = StreamingResponse.ResultOrError.result(getProduceResponse(3));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result4);
        chunkedOutput.close();
        EasyMock.replay(new Object[]{chunkedOutput, chunkedOutputFactory});
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return streamingProduceRequestsMappingIterator;
        }));
        EasyMock.verify(new Object[]{streamingProduceRequestsMappingIterator});
        EasyMock.verify(new Object[]{chunkedOutput});
    }

    @Test
    public void produceWithByteLimit() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "100");
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(30));
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", "3600000");
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = getChunkedOutput(chunkedOutputFactory, 2);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter3);
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter4);
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andThrow(new RateLimitExceededException());
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2, provider3, provider4});
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 1, provider, provider2, provider3, provider4);
        MappingIterator<ProduceRequest> produceRequestsMappingIterator = getProduceRequestsMappingIterator(2);
        StreamingResponse.ResultHolder result = StreamingResponse.ResultOrError.result(getProduceResponse(0));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result);
        chunkedOutput.close();
        StreamingResponse.ErrorHolder error = StreamingResponse.ResultOrError.error(ErrorResponse.create(429, "Request rate limit exceeded: The rate limit of requests per second has been exceeded."));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(error);
        chunkedOutput.close();
        EasyMock.replay(new Object[]{chunkedOutput, chunkedOutputFactory});
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        EasyMock.verify(new Object[]{produceRequestsMappingIterator, chunkedOutput, provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2});
    }

    @Test
    public void produceWithCountLimit() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "100");
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(30));
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", "3600000");
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = getChunkedOutput(chunkedOutputFactory, 2);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter3);
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter4);
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        EasyMock.expectLastCall().andThrow(new RateLimitExceededException());
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2, provider3, provider4});
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 1, provider, provider2, provider3, provider4);
        MappingIterator<ProduceRequest> produceRequestsMappingIterator = getProduceRequestsMappingIterator(2);
        StreamingResponse.ResultHolder result = StreamingResponse.ResultOrError.result(getProduceResponse(0));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result);
        chunkedOutput.close();
        StreamingResponse.ErrorHolder error = StreamingResponse.ResultOrError.error(ErrorResponse.create(429, "Request rate limit exceeded: The rate limit of requests per second has been exceeded."));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(error);
        chunkedOutput.close();
        EasyMock.replay(new Object[]{chunkedOutput, chunkedOutputFactory});
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        EasyMock.verify(new Object[]{produceRequestsMappingIterator, chunkedOutput, provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2});
    }

    @Test
    public void produceNoLimit() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "100");
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(30));
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", "3600000");
        properties.put("api.v3.produce.rate.limit.enabled", "falsse");
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = getChunkedOutput(chunkedOutputFactory, 2);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.replay(new Object[]{provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2, provider3, provider4});
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 2, provider, provider2, provider3, provider4);
        MappingIterator<ProduceRequest> produceRequestsMappingIterator = getProduceRequestsMappingIterator(2);
        StreamingResponse.ResultHolder result = StreamingResponse.ResultOrError.result(getProduceResponse(0));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result);
        chunkedOutput.close();
        StreamingResponse.ResultHolder result2 = StreamingResponse.ResultOrError.result(getProduceResponse(1));
        EasyMock.expect(Boolean.valueOf(chunkedOutput.isClosed())).andReturn(false);
        chunkedOutput.write(result2);
        chunkedOutput.close();
        EasyMock.replay(new Object[]{chunkedOutput, chunkedOutputFactory});
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        produceAction.produce(new FakeAsyncResponse(), "clusterId", "topicName", new JsonStream(() -> {
            return produceRequestsMappingIterator;
        }));
        EasyMock.verify(new Object[]{produceRequestsMappingIterator, chunkedOutput, provider, provider2, requestRateLimiter3, requestRateLimiter4, requestRateLimiter, requestRateLimiter2});
    }

    @Test
    public void testHasNextOnNullData() throws Exception {
        Properties properties = new Properties();
        properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "100");
        properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", Integer.toString(30));
        properties.put("api.v3.produce.rate.limit.cache.expiry.ms", "3600000");
        properties.put("api.v3.produce.rate.limit.enabled", "true");
        ChunkedOutputFactory chunkedOutputFactory = (ChunkedOutputFactory) EasyMock.mock(ChunkedOutputFactory.class);
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        Provider provider3 = (Provider) EasyMock.mock(Provider.class);
        Provider provider4 = (Provider) EasyMock.mock(Provider.class);
        RequestRateLimiter requestRateLimiter = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter2 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter3 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        RequestRateLimiter requestRateLimiter4 = (RequestRateLimiter) EasyMock.mock(RequestRateLimiter.class);
        EasyMock.expect(provider.get()).andReturn(requestRateLimiter3);
        EasyMock.expect(provider2.get()).andReturn(requestRateLimiter4);
        EasyMock.expect(provider3.get()).andReturn(requestRateLimiter);
        EasyMock.expect(provider4.get()).andReturn(requestRateLimiter2);
        requestRateLimiter3.rateLimit(EasyMock.anyInt());
        requestRateLimiter4.rateLimit(EasyMock.anyInt());
        requestRateLimiter2.rateLimit(EasyMock.anyInt());
        requestRateLimiter.rateLimit(EasyMock.anyInt());
        ProduceAction produceAction = getProduceAction(properties, chunkedOutputFactory, 1, provider, provider2, provider3, provider4);
        FakeAsyncResponse fakeAsyncResponse = new FakeAsyncResponse();
        RestConstraintViolationException assertThrows = Assertions.assertThrows(RestConstraintViolationException.class, () -> {
            produceAction.produce(fakeAsyncResponse, "clusterId", "topicName", (JsonStream) null);
        });
        Assertions.assertEquals("Payload error. Null input provided. Data is required.", assertThrows.getMessage());
        Assertions.assertEquals(42206, assertThrows.getErrorCode());
    }

    private static Provider<RecordSerializer> getRecordSerializerProvider(boolean z) {
        Provider<RecordSerializer> provider = (Provider) EasyMock.mock(Provider.class);
        RecordSerializer recordSerializer = (RecordSerializer) EasyMock.mock(RecordSerializer.class);
        EasyMock.expect(provider.get()).andReturn(recordSerializer).anyTimes();
        if (z) {
            EasyMock.expect(recordSerializer.serialize((EmbeddedFormat) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (JsonNode) EasyMock.anyObject(), EasyMock.anyBoolean())).andThrow(Errors.messageSerializationException("Schema Registry not defined, no Schema Registry client available to deserialize message.")).anyTimes();
        } else {
            EasyMock.expect(recordSerializer.serialize((EmbeddedFormat) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (JsonNode) EasyMock.anyObject(), EasyMock.anyBoolean())).andReturn(Optional.empty()).anyTimes();
        }
        EasyMock.replay(new Object[]{provider, recordSerializer});
        EasyMock.verify(new Object[]{recordSerializer});
        return provider;
    }

    private static ProduceController getProduceControllerMock(Provider provider) {
        ProduceController produceController = (ProduceController) EasyMock.mock(ProduceController.class);
        EasyMock.expect(provider.get()).andReturn(produceController).anyTimes();
        return produceController;
    }

    private static MappingIterator<ProduceRequest> getProduceRequestsMappingIterator(int i) throws IOException {
        MappingIterator<ProduceRequest> mappingIterator = (MappingIterator) EasyMock.mock(MappingIterator.class);
        ProduceRequest build = ProduceRequest.builder().setOriginalSize(25L).build();
        for (int i2 = 0; i2 < i; i2++) {
            EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(true).times(1);
            EasyMock.expect(mappingIterator.nextValue()).andReturn(build).times(1);
            EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(false).times(1);
            mappingIterator.close();
        }
        EasyMock.replay(new Object[]{mappingIterator});
        return mappingIterator;
    }

    private static MappingIterator<ProduceRequest> getProduceRequestsMappingIteratorWithSchemaNeeded() throws IOException {
        MappingIterator<ProduceRequest> mappingIterator = (MappingIterator) EasyMock.mock(MappingIterator.class);
        ProduceRequest build = ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setData(TextNode.valueOf("bob")).setRawSchema("bob").build()).setOriginalSize(25L).build();
        EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(true).times(1);
        EasyMock.expect(mappingIterator.nextValue()).andReturn(build).times(1);
        EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(false).times(1);
        mappingIterator.close();
        EasyMock.replay(new Object[]{mappingIterator});
        return mappingIterator;
    }

    private static MappingIterator<ProduceRequest> getStreamingProduceRequestsMappingIterator(int i) throws IOException {
        MappingIterator<ProduceRequest> mappingIterator = (MappingIterator) EasyMock.mock(MappingIterator.class);
        for (int i2 = 0; i2 < i; i2++) {
            ProduceRequest build = ProduceRequest.builder().setOriginalSize(25L).build();
            EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(true).times(1);
            EasyMock.expect(mappingIterator.nextValue()).andReturn(build).times(1);
        }
        EasyMock.expect(Boolean.valueOf(mappingIterator.hasNext())).andReturn(false).times(1);
        mappingIterator.close();
        EasyMock.replay(new Object[]{mappingIterator});
        return mappingIterator;
    }

    private static ChunkedOutput<StreamingResponse.ResultOrError> getChunkedOutput(ChunkedOutputFactory chunkedOutputFactory, int i) {
        ChunkedOutput<StreamingResponse.ResultOrError> chunkedOutput = (ChunkedOutput) EasyMock.mock(ChunkedOutput.class);
        EasyMock.expect(chunkedOutputFactory.getChunkedOutput()).andReturn(chunkedOutput).times(i);
        return chunkedOutput;
    }

    private static ProduceResponse getProduceResponse(int i) {
        return getProduceResponse(i, Optional.empty());
    }

    private static ProduceResponse getProduceResponse(int i, Optional<Duration> optional) {
        return getProduceResponse(i, optional, 0);
    }

    private static ProduceResponse getProduceResponse(int i, Optional<Duration> optional, int i2) {
        return ProduceResponse.builder().setClusterId("clusterId").setTopicName("topicName").setPartitionId(i2).setOffset(i).setTimestamp(Instant.ofEpochMilli(0L)).setErrorCode(200).build();
    }

    private static CompletableFuture<ProduceResult> getProduceResultMock(int i, int i2) {
        ProduceResult produceResult = (ProduceResult) EasyMock.mock(ProduceResult.class);
        setExpectsForProduceResult(produceResult, i, i2);
        return CompletableFuture.completedFuture(produceResult);
    }

    private static void setExpectsForProduceResult(ProduceResult produceResult, long j, int i) {
        EasyMock.expect(Integer.valueOf(produceResult.getPartitionId())).andReturn(Integer.valueOf(i)).anyTimes();
        EasyMock.expect(Long.valueOf(produceResult.getOffset())).andReturn(Long.valueOf(j)).anyTimes();
        EasyMock.expect(produceResult.getTimestamp()).andReturn(Optional.of(Instant.ofEpochMilli(0L))).anyTimes();
        EasyMock.expect(Integer.valueOf(produceResult.getSerializedKeySize())).andReturn(1).anyTimes();
        EasyMock.expect(Integer.valueOf(produceResult.getSerializedValueSize())).andReturn(1).anyTimes();
        EasyMock.expect(produceResult.getCompletionTimestamp()).andReturn(Instant.now()).anyTimes();
        EasyMock.replay(new Object[]{produceResult});
    }

    private static void setupExpectsMockCallsForProduce(ProduceController produceController, int i, int i2) {
        for (int i3 = 0; i3 < i; i3++) {
            EasyMock.expect(produceController.produce((String) EasyMock.anyObject(), (String) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Multimap) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Optional) EasyMock.anyObject(), (Instant) EasyMock.anyObject())).andReturn(getProduceResultMock(i3, i2));
        }
    }

    private static ProduceAction getProduceAction(Properties properties, ChunkedOutputFactory chunkedOutputFactory, int i, Provider<RequestRateLimiter> provider, Provider<RequestRateLimiter> provider2, Provider<RequestRateLimiter> provider3, Provider<RequestRateLimiter> provider4) {
        return getProduceAction(properties, chunkedOutputFactory, i, provider, provider2, provider3, provider4, false);
    }

    private static ProduceAction getProduceAction(Properties properties, ChunkedOutputFactory chunkedOutputFactory, int i, Provider<RequestRateLimiter> provider, Provider<RequestRateLimiter> provider2, Provider<RequestRateLimiter> provider3, Provider<RequestRateLimiter> provider4, boolean z) {
        return getProduceAction(new ProduceRateLimiters(provider, provider2, provider3, provider4, Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("api.v3.produce.rate.limit.enabled"))), Duration.ofMillis(Integer.parseInt(properties.getProperty("api.v3.produce.rate.limit.cache.expiry.ms")))), chunkedOutputFactory, i, 0, z);
    }

    private static ProduceAction getProduceAction(ProduceRateLimiters produceRateLimiters, ChunkedOutputFactory chunkedOutputFactory, int i, int i2, boolean z) {
        Provider provider = (Provider) EasyMock.mock(Provider.class);
        SchemaManager schemaManager = (SchemaManager) EasyMock.mock(SchemaManager.class);
        EasyMock.expect(provider.get()).andReturn(schemaManager);
        EasyMock.expect(schemaManager.getSchema("topicName", Optional.of(EmbeddedFormat.AVRO), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of("bob"), true)).andThrow(Errors.invalidPayloadException("Schema Registry must be configured when using schemas."));
        EasyMock.replay(new Object[]{provider, schemaManager});
        Provider<RecordSerializer> recordSerializerProvider = getRecordSerializerProvider(z);
        Provider provider2 = (Provider) EasyMock.mock(Provider.class);
        ProduceController produceControllerMock = getProduceControllerMock(provider2);
        setupExpectsMockCallsForProduce(produceControllerMock, i, i2);
        EasyMock.replay(new Object[]{provider2, produceControllerMock});
        StreamingResponseFactory streamingResponseFactory = new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS);
        ListeningExecutorService newDirectExecutorService = MoreExecutors.newDirectExecutorService();
        KafkaRestConfig kafkaRestConfig = new KafkaRestConfig();
        kafkaRestConfig.setMetrics(new Metrics());
        ProduceAction produceAction = new ProduceAction(provider, recordSerializerProvider, provider2, () -> {
            return new ProducerMetrics(kafkaRestConfig, Collections.emptyMap());
        }, streamingResponseFactory, produceRateLimiters, newDirectExecutorService);
        produceRateLimiters.clear();
        return produceAction;
    }
}
