package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/LogEventsIntegrationTest.class */
public class LogEventsIntegrationTest {
    private static final int NUM_WORKERS = 2;
    private static final String FOO_TOPIC = "foo-topic";
    private static final String FOO_CONNECTOR = "foo-source";
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private EmbeddedConnectCluster.Builder connectBuilder;
    private EmbeddedConnectCluster connect;
    Map<String, String> workerProps = new HashMap();
    Properties brokerProps = new Properties();

    /* loaded from: input_file:org/apache/kafka/connect/integration/LogEventsIntegrationTest$FaultySourceConnector.class */
    public static class FaultySourceConnector extends MonitorableSourceConnector {

        /* loaded from: input_file:org/apache/kafka/connect/integration/LogEventsIntegrationTest$FaultySourceConnector$FaultySourceTask.class */
        public static class FaultySourceTask extends MonitorableSourceConnector.MonitorableSourceTask {
            private static final int NUM_ITERATIONS_BEFORE_FAILURE = 2;
            private int numIterations = 0;

            @Override // org.apache.kafka.connect.integration.MonitorableSourceConnector.MonitorableSourceTask
            public List<SourceRecord> poll() {
                List<SourceRecord> poll = super.poll();
                int i = this.numIterations;
                this.numIterations = i + 1;
                if (i < NUM_ITERATIONS_BEFORE_FAILURE) {
                    return poll;
                }
                throw new ConnectException(String.format("Configured iterations - %d completed. Connector Task %s failing.", Integer.valueOf(NUM_ITERATIONS_BEFORE_FAILURE), taskId()));
            }
        }

        @Override // org.apache.kafka.connect.integration.MonitorableSourceConnector, org.apache.kafka.connect.runtime.SampleSourceConnector
        public Class<? extends Task> taskClass() {
            return FaultySourceTask.class;
        }
    }

    @Before
    public void setup() {
        this.workerProps.put("connector.client.config.override.policy", "All");
        this.brokerProps.put("auto.create.topics.enable", String.valueOf(false));
        this.connectBuilder = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_WORKERS).workerProps(this.workerProps).brokerProps(this.brokerProps).maskExitProcedures(true);
    }

    @After
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testConnectWithConnectEventLogsDisabled() throws InterruptedException {
        this.workerProps.put("confluent.event.logger.enable", "false");
        startConnectorAndWaitForTaskFailure();
        this.connect.assertions().assertTopicsDoNotExist("confluent-connect-log-events");
    }

    @Test
    public void testConnectorWithConnectLogEventsEnabled() throws InterruptedException {
        this.workerProps.put("confluent.event.logger.enable", "true");
        this.workerProps.put("confluent.event.logger.exporter.kafka.topic.replicas", "1");
        startConnectorAndWaitForTaskFailure();
        this.connect.assertions().assertTopicsExist("confluent-connect-log-events");
        this.connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(FOO_CONNECTOR, 1, "Faulty task did not fail after configured iterations");
        Assert.assertEquals(1L, this.connect.kafka().consume(1, CONSUME_MAX_DURATION_MS, "confluent-connect-log-events").count());
    }

    private void startConnectorAndWaitForTaskFailure() throws InterruptedException {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        Assert.assertFalse(this.connect.kafka().describeTopics("confluent-connect-log-events").get("confluent-connect-log-events").isPresent());
        this.connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: foo-source");
        this.connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProperties());
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
            Assert.fail("Test thread woke up before connector could fail");
        }
    }

    private Map<String, String> defaultSourceConnectorProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, FOO_TOPIC);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, String.valueOf(10));
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, String.valueOf(10));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("connector.class", FaultySourceConnector.class.getSimpleName());
        return hashMap;
    }
}
