package io.confluent.connect.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.http.Fault;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.json.JsonFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.util.EmbeddedConnectUtils;
import io.confluent.connect.s3.util.S3Utils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.converters.ByteArrayConverter;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test.class */
public class S3SinkConnectorFaultyS3Test extends TestWithMockedFaultyS3 {
    protected static final int MAX_TASKS = 1;
    protected static final int PART_SIZE = 5242880;
    protected static final int TOPIC_PARTITIONS = 2;
    protected static final String CONNECTOR_NAME = "s3-sink-";
    protected static final int FLUSH_SIZE_SMALL = 30;
    protected static final int FLUSH_SIZE_HUGE = 1400;
    protected static EmbeddedConnectCluster connect;
    protected static Admin kafkaAdmin;
    protected String connectorName;
    protected String topicName;
    protected AmazonS3 s3;
    private final Failure failure;
    private final Class formatClass;
    private final Class converterClass;
    private final int flushSize;
    protected static final long S3_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
    protected static final int FLUSH_SIZE_BIG = 70;
    protected static final String[] TEST_MESSAGES = generateTestMessages(FLUSH_SIZE_BIG);

    /* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test$CompleteMultipartUploadRequestFailure.class */
    private static class CompleteMultipartUploadRequestFailure implements Failure {
        private final ResponseDefinitionBuilder response;

        public CompleteMultipartUploadRequestFailure(ResponseDefinitionBuilder responseDefinitionBuilder) {
            this.response = responseDefinitionBuilder;
        }

        @Override // io.confluent.connect.s3.S3SinkConnectorFaultyS3Test.Failure
        public void inject() {
            TestWithMockedFaultyS3.injectS3FailureFor(WireMock.post(WireMock.anyUrl()).withQueryParam("uploadId", WireMock.matching(".*")).willReturn(this.response));
        }
    }

    /* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test$CreateMultipartUploadRequestFailure.class */
    private static class CreateMultipartUploadRequestFailure implements Failure {
        private final ResponseDefinitionBuilder response;

        public CreateMultipartUploadRequestFailure(ResponseDefinitionBuilder responseDefinitionBuilder) {
            this.response = responseDefinitionBuilder;
        }

        @Override // io.confluent.connect.s3.S3SinkConnectorFaultyS3Test.Failure
        public void inject() {
            TestWithMockedFaultyS3.injectS3FailureFor(WireMock.post(WireMock.anyUrl()).withQueryParam("uploads", WireMock.matching("$^")).willReturn(this.response));
        }
    }

    /* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test$Failure.class */
    interface Failure {
        void inject();
    }

    /* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test$Failures.class */
    enum Failures implements Failure {
        FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429(new CreateMultipartUploadRequestFailure(WireMock.aResponse().withStatus(429))),
        FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429(new PartUploadRequestFailure(WireMock.aResponse().withStatus(429))),
        FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429(new CompleteMultipartUploadRequestFailure(WireMock.aResponse().withStatus(429))),
        FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500(new CreateMultipartUploadRequestFailure(WireMock.aResponse().withStatus(500))),
        FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500(new PartUploadRequestFailure(WireMock.aResponse().withStatus(500))),
        FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500(new CompleteMultipartUploadRequestFailure(WireMock.aResponse().withStatus(500))),
        FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET(new CreateMultipartUploadRequestFailure(WireMock.aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))),
        FAIL_UPLOAD_PART_REQUEST_WITH_RESET(new PartUploadRequestFailure(WireMock.aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))),
        FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET(new CompleteMultipartUploadRequestFailure(WireMock.aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));

        private final Failure failure;

        @Override // io.confluent.connect.s3.S3SinkConnectorFaultyS3Test.Failure
        public void inject() {
            this.failure.inject();
        }

        Failures(Failure failure) {
            this.failure = failure;
        }
    }

    /* loaded from: input_file:io/confluent/connect/s3/S3SinkConnectorFaultyS3Test$PartUploadRequestFailure.class */
    private static class PartUploadRequestFailure implements Failure {
        private final ResponseDefinitionBuilder response;

        public PartUploadRequestFailure(ResponseDefinitionBuilder responseDefinitionBuilder) {
            this.response = responseDefinitionBuilder;
        }

        @Override // io.confluent.connect.s3.S3SinkConnectorFaultyS3Test.Failure
        public void inject() {
            TestWithMockedFaultyS3.injectS3FailureFor(WireMock.put(WireMock.anyUrl()).withQueryParam("partNumber", WireMock.matching(".*")).withQueryParam("uploadId", WireMock.matching(".*")).willReturn(this.response));
        }
    }

    public S3SinkConnectorFaultyS3Test(Class cls, Class cls2, Failure failure, int i) {
        this.failure = failure;
        this.formatClass = cls;
        this.converterClass = cls2;
        this.flushSize = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public Map<String, String> createProps() {
        Map<String, String> createProps = super.createProps();
        createProps.put("connector.class", S3SinkConnector.class.getName());
        createProps.put("tasks.max", Integer.toString(MAX_TASKS));
        createProps.put("format.class", this.formatClass.getName());
        createProps.put("value.converter", this.converterClass.getName());
        createProps.put("parquet.codec", "none");
        createProps.put("s3.part.retries", "0");
        createProps.put("retry.backoff.ms", "100");
        createProps.put("s3.part.size", Integer.toString(PART_SIZE));
        createProps.put("flush.size", Integer.toString(this.flushSize));
        createProps.put("aws.access.key.id", "12345");
        createProps.put("aws.secret.access.key", "12345");
        createProps.put("topics", this.topicName);
        return createProps;
    }

    @Override // io.confluent.connect.s3.TestWithMockedFaultyS3, io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    @Before
    public void setUp() throws Exception {
        this.topicName = "test-topic-" + UUID.randomUUID();
        connect.kafka().createTopic(this.topicName, TOPIC_PARTITIONS);
        super.setUp();
        this.s3 = newS3Client(this.connectorConfig);
        this.s3.createBucket("kafka.bucket");
        this.connectorName = CONNECTOR_NAME + UUID.randomUUID();
        connect.configureConnector(this.connectorName, this.properties);
        EmbeddedConnectUtils.waitForConnectorToStart(connect, this.connectorName, Math.min(TOPIC_PARTITIONS, MAX_TASKS));
    }

    @Override // io.confluent.connect.s3.TestWithMockedFaultyS3, io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    @After
    public void tearDown() throws Exception {
        connect.deleteConnector(this.connectorName);
        kafkaAdmin.deleteTopics(Collections.singleton(this.topicName));
        super.tearDown();
    }

    @BeforeClass
    public static void startConnect() {
        connect = new EmbeddedConnectCluster.Builder().name("s3-connect-cluster").build();
        connect.start();
        kafkaAdmin = connect.kafka().createAdminClient();
    }

    @AfterClass
    public static void stopConnect() {
        connect.stop();
    }

    @Parameterized.Parameters
    public static Collection<Object[]> tests() {
        return Arrays.asList(new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_429, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_ERROR_500, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ByteArrayFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{JsonFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{AvroFormat.class, ByteArrayConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, 30}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_CREATE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_UPLOAD_PART_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_HUGE)}, new Object[]{ParquetFormat.class, JsonConverter.class, Failures.FAIL_COMPLETE_MULTIPART_UPLOAD_REQUEST_WITH_RESET, Integer.valueOf(FLUSH_SIZE_BIG)});
    }

    @Test
    public void testErrorIsRetriedByConnectFramework() throws Exception {
        this.failure.inject();
        for (int i = 0; i < this.flushSize; i += MAX_TASKS) {
            connect.kafka().produce(this.topicName, 0, (String) null, TEST_MESSAGES[i % TEST_MESSAGES.length]);
        }
        S3Utils.waitForFilesInBucket(this.s3, "kafka.bucket", MAX_TASKS, S3_TIMEOUT_MS);
    }

    private static String[] generateTestMessages(int i) {
        String[] strArr = new String[i];
        for (int i2 = 0; i2 < i; i2 += MAX_TASKS) {
            strArr[i2] = generateTestMessage();
        }
        return strArr;
    }

    private static String generateTestMessage() {
        return envelopeStringToJsonWithSchema(generateLongString(102400));
    }

    private static String generateLongString(int i) {
        return RandomStringUtils.random(i);
    }

    private static String envelopeStringToJsonWithSchema(String str) {
        ObjectMapper objectMapper = new ObjectMapper();
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.put("type", "string");
        createObjectNode.put("optional", false);
        createObjectNode.put("field", "string");
        ArrayNode createArrayNode = objectMapper.createArrayNode();
        createArrayNode.add(createObjectNode);
        ObjectNode createObjectNode2 = objectMapper.createObjectNode();
        createObjectNode2.put("type", "struct");
        createObjectNode2.set("fields", createArrayNode);
        ObjectNode createObjectNode3 = objectMapper.createObjectNode();
        createObjectNode3.put("string", str);
        ObjectNode createObjectNode4 = objectMapper.createObjectNode();
        createObjectNode4.set("payload", createObjectNode3);
        createObjectNode4.set("schema", createObjectNode2);
        try {
            return objectMapper.writeValueAsString(createObjectNode4);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
