package org.apache.kafka.connect.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.class */
public class ErrorHandlingTaskTest {
    private static final String TOPIC = "test";
    private static final int PARTITION1 = 12;
    private static final int PARTITION2 = 13;
    private static final long FIRST_OFFSET = 45;

    @Mock
    Plugins plugins;
    private static final Map<String, String> TASK_PROPS = new HashMap();
    public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000;
    public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000;
    public static final ToleranceType OPERATOR_TOLERANCE_TYPE;
    private static final TaskConfig TASK_CONFIG;
    private Time time;
    private MockConnectMetrics metrics;

    @Mock
    private SinkTask sinkTask;

    @Mock
    private SourceTask sourceTask;
    private WorkerConfig workerConfig;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private HeaderConverter headerConverter;
    private WorkerSinkTask workerSinkTask;
    private WorkerSourceTask workerSourceTask;

    @Mock
    private KafkaConsumer<byte[], byte[]> consumer;

    @Mock
    private KafkaProducer<byte[], byte[]> producer;

    @Mock
    OffsetStorageReader offsetReader;

    @Mock
    OffsetStorageWriter offsetWriter;

    @Mock
    private TaskStatus.Listener statusListener;
    private ErrorHandlingMetrics errorHandlingMetrics;
    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private TargetState initialState = TargetState.STARTED;
    private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
    private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/ErrorHandlingTaskTest$FaultyConverter.class */
    public static class FaultyConverter extends JsonConverter {
        private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
        private int invocations = 0;

        FaultyConverter() {
        }

        public byte[] fromConnectData(String str, Schema schema, Object obj) {
            if (obj == null) {
                return super.fromConnectData(str, schema, (Object) null);
            }
            this.invocations++;
            if (this.invocations % 3 == 0) {
                log.debug("Succeeding record: {} where invocations={}", obj, Integer.valueOf(this.invocations));
                return super.fromConnectData(str, schema, obj);
            }
            log.debug("Failing record: {} at invocations={}", obj, Integer.valueOf(this.invocations));
            throw new RetriableException("Bad invocations " + this.invocations + " for mod 3");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/ErrorHandlingTaskTest$FaultyPassthrough.class */
    public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
        private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
        private static final String MOD_CONFIG = "mod";
        private static final int MOD_CONFIG_DEFAULT = 3;
        public static final ConfigDef CONFIG_DEF = new ConfigDef().define(MOD_CONFIG, ConfigDef.Type.INT, Integer.valueOf(MOD_CONFIG_DEFAULT), ConfigDef.Importance.MEDIUM, "Pass records without failure only if timestamp % mod == 0");
        private int mod = MOD_CONFIG_DEFAULT;
        private int invocations = 0;

        FaultyPassthrough() {
        }

        public R apply(R r) {
            this.invocations++;
            if (this.invocations % this.mod == 0) {
                log.debug("Succeeding record: {} where invocations={}", r, Integer.valueOf(this.invocations));
                return r;
            }
            log.debug("Failing record: {} at invocations={}", r, Integer.valueOf(this.invocations));
            throw new RetriableException("Bad invocations " + this.invocations + " for mod " + this.mod);
        }

        public ConfigDef config() {
            return CONFIG_DEF;
        }

        public void close() {
            log.info("Shutting down transform");
        }

        public void configure(Map<String, ?> map) {
            this.mod = Math.max(new SimpleConfig(CONFIG_DEF, map).getInt(MOD_CONFIG).intValue(), 2);
            log.info("Configuring {}. Setting mod to {}", getClass(), Integer.valueOf(this.mod));
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/ErrorHandlingTaskTest$TestSinkTask.class */
    private static abstract class TestSinkTask extends SinkTask {
        private TestSinkTask() {
        }
    }

    @Before
    public void setup() {
        this.time = new MockTime(0L, 0L, 0L);
        this.metrics = new MockConnectMetrics();
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter.schemas.enable", "false");
        hashMap.put("internal.value.converter.schemas.enable", "false");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.pluginLoader = (PluginClassLoader) PowerMock.createMock(PluginClassLoader.class);
        this.workerConfig = new StandaloneConfig(hashMap);
        this.errorHandlingMetrics = new ErrorHandlingMetrics(this.taskId, this.metrics);
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test
    public void testErrorHandlingInSinkTasks() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        LogReporter logReporter = new LogReporter(this.taskId, connConfig(hashMap));
        logReporter.metrics(this.errorHandlingMetrics);
        RetryWithToleranceOperator operator = operator();
        operator.metrics(this.errorHandlingMetrics);
        operator.reporters(Collections.singletonList(logReporter));
        createSinkTask(this.initialState, operator);
        expectInitializeTask();
        ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, (Object) null, "{\"a\": 10}".getBytes());
        ConsumerRecord<byte[], byte[]> consumerRecord2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, (Object) null, "{\"a\" 10}".getBytes());
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(records(consumerRecord));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(records(consumerRecord2));
        this.sinkTask.put((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall().times(2);
        PowerMock.replayAll(new Object[0]);
        this.workerSinkTask.initialize(TASK_CONFIG);
        this.workerSinkTask.initializeAndStart();
        this.workerSinkTask.iteration();
        this.workerSinkTask.iteration();
        assertSinkMetricValue("sink-record-read-total", 2.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertErrorHandlingMetricValue("total-record-errors", 1.0d);
        assertErrorHandlingMetricValue("total-record-failures", 3.0d);
        assertErrorHandlingMetricValue("total-records-skipped", 1.0d);
        PowerMock.verifyAll();
    }

    private RetryWithToleranceOperator operator() {
        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, Time.SYSTEM);
    }

    @Test
    public void testErrorHandlingInSourceTasks() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        LogReporter logReporter = new LogReporter(this.taskId, connConfig(hashMap));
        logReporter.metrics(this.errorHandlingMetrics);
        RetryWithToleranceOperator operator = operator();
        operator.metrics(this.errorHandlingMetrics);
        operator.reporters(Collections.singletonList(logReporter));
        createSourceTask(this.initialState, operator);
        Schema build = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
        SourceRecord sourceRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), TOPIC, Integer.valueOf(PARTITION1), build, new Struct(build).put("val", 1234));
        SourceRecord sourceRecord2 = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), TOPIC, Integer.valueOf(PARTITION1), build, new Struct(build).put("val", 6789));
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.commitOffsets())).andReturn(true);
        this.offsetWriter.offset((Map) EasyMock.anyObject(), (Map) EasyMock.anyObject());
        EasyMock.expectLastCall().times(2);
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.sourceTask.start((Map) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(this.sourceTask.poll()).andReturn(Collections.singletonList(sourceRecord));
        EasyMock.expect(this.sourceTask.poll()).andReturn(Collections.singletonList(sourceRecord2));
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andReturn((Object) null).times(2);
        PowerMock.replayAll(new Object[0]);
        this.workerSourceTask.initialize(TASK_CONFIG);
        this.workerSourceTask.execute();
        assertSourceMetricValue("source-record-poll-total", 2.0d);
        assertSourceMetricValue("source-record-write-total", 0.0d);
        assertErrorHandlingMetricValue("total-record-errors", 0.0d);
        assertErrorHandlingMetricValue("total-record-failures", 4.0d);
        assertErrorHandlingMetricValue("total-records-skipped", 0.0d);
        PowerMock.verifyAll();
    }

    private ConnectorConfig connConfig(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", TOPIC);
        hashMap.put("connector.class", SinkTask.class.getName());
        hashMap.putAll(map);
        return new ConnectorConfig(this.plugins, hashMap);
    }

    @Test
    public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        LogReporter logReporter = new LogReporter(this.taskId, connConfig(hashMap));
        logReporter.metrics(this.errorHandlingMetrics);
        RetryWithToleranceOperator operator = operator();
        operator.metrics(this.errorHandlingMetrics);
        operator.reporters(Collections.singletonList(logReporter));
        createSourceTask(this.initialState, operator, badConverter());
        Schema build = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
        SourceRecord sourceRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), TOPIC, Integer.valueOf(PARTITION1), build, new Struct(build).put("val", 1234));
        SourceRecord sourceRecord2 = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), TOPIC, Integer.valueOf(PARTITION1), build, new Struct(build).put("val", 6789));
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.isStopping())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.workerSourceTask.commitOffsets())).andReturn(true);
        this.offsetWriter.offset((Map) EasyMock.anyObject(), (Map) EasyMock.anyObject());
        EasyMock.expectLastCall().times(2);
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.sourceTask.start((Map) EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect(this.sourceTask.poll()).andReturn(Collections.singletonList(sourceRecord));
        EasyMock.expect(this.sourceTask.poll()).andReturn(Collections.singletonList(sourceRecord2));
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andReturn((Object) null).times(2);
        PowerMock.replayAll(new Object[0]);
        this.workerSourceTask.initialize(TASK_CONFIG);
        this.workerSourceTask.execute();
        assertSourceMetricValue("source-record-poll-total", 2.0d);
        assertSourceMetricValue("source-record-write-total", 0.0d);
        assertErrorHandlingMetricValue("total-record-errors", 0.0d);
        assertErrorHandlingMetricValue("total-record-failures", 8.0d);
        assertErrorHandlingMetricValue("total-records-skipped", 0.0d);
        PowerMock.verifyAll();
    }

    private void assertSinkMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.workerSinkTask.sinkTaskMetricsGroup().metricGroup(), str), 0.001d);
    }

    private void assertSourceMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.workerSourceTask.sourceTaskMetricsGroup().metricGroup(), str), 0.001d);
    }

    private void assertErrorHandlingMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.errorHandlingMetrics.metricGroup(), str), 0.001d);
    }

    private void expectInitializeTask() throws Exception {
        PowerMock.expectPrivate(this.workerSinkTask, "createConsumer", new Object[0]).andReturn(this.consumer);
        this.consumer.subscribe((Collection) EasyMock.eq(Collections.singletonList(TOPIC)), (ConsumerRebalanceListener) EasyMock.capture(this.rebalanceListener));
        PowerMock.expectLastCall();
        this.sinkTask.initialize((SinkTaskContext) EasyMock.capture(this.sinkTaskContext));
        PowerMock.expectLastCall();
        this.sinkTask.start(TASK_PROPS);
        PowerMock.expectLastCall();
    }

    private void createSinkTask(TargetState targetState, RetryWithToleranceOperator retryWithToleranceOperator) {
        JsonConverter jsonConverter = new JsonConverter();
        Map originalsWithPrefix = this.workerConfig.originalsWithPrefix("value.converter.");
        originalsWithPrefix.put("converter.type", "value");
        originalsWithPrefix.put("schemas.enable", "false");
        jsonConverter.configure(originalsWithPrefix);
        this.workerSinkTask = (WorkerSinkTask) PowerMock.createPartialMock(WorkerSinkTask.class, new String[]{"createConsumer"}, new Object[]{this.taskId, this.sinkTask, this.statusListener, targetState, this.workerConfig, ClusterConfigState.EMPTY, this.metrics, jsonConverter, jsonConverter, this.headerConverter, new TransformationChain(Collections.singletonList(new FaultyPassthrough()), retryWithToleranceOperator), this.pluginLoader, this.time, retryWithToleranceOperator});
    }

    private void createSourceTask(TargetState targetState, RetryWithToleranceOperator retryWithToleranceOperator) {
        JsonConverter jsonConverter = new JsonConverter();
        Map originalsWithPrefix = this.workerConfig.originalsWithPrefix("value.converter.");
        originalsWithPrefix.put("converter.type", "value");
        originalsWithPrefix.put("schemas.enable", "false");
        jsonConverter.configure(originalsWithPrefix);
        createSourceTask(targetState, retryWithToleranceOperator, jsonConverter);
    }

    private Converter badConverter() {
        FaultyConverter faultyConverter = new FaultyConverter();
        Map originalsWithPrefix = this.workerConfig.originalsWithPrefix("value.converter.");
        originalsWithPrefix.put("converter.type", "value");
        originalsWithPrefix.put("schemas.enable", "false");
        faultyConverter.configure(originalsWithPrefix);
        return faultyConverter;
    }

    private void createSourceTask(TargetState targetState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
        this.workerSourceTask = (WorkerSourceTask) PowerMock.createPartialMock(WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"}, new Object[]{this.taskId, this.sourceTask, this.statusListener, targetState, converter, converter, this.headerConverter, new TransformationChain(Collections.singletonList(new FaultyPassthrough()), retryWithToleranceOperator), this.producer, this.offsetReader, this.offsetWriter, this.workerConfig, ClusterConfigState.EMPTY, this.metrics, this.pluginLoader, this.time, retryWithToleranceOperator});
    }

    private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> consumerRecord) {
        return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Collections.singletonList(consumerRecord)));
    }

    static {
        TASK_PROPS.put("topics", TOPIC);
        TASK_PROPS.put("task.class", TestSinkTask.class.getName());
        OPERATOR_TOLERANCE_TYPE = ToleranceType.ALL;
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
    }
}
