package org.apache.kafka.connect.runtime;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.AbstractWorkerSourceTask;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.class */
public class AbstractWorkerSourceTaskTest {
    private static final String TOPIC = "topic";
    private static final String OTHER_TOPIC = "other-topic";
    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
    private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Integer KEY = -1;
    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
    private static final Long RECORD = 12L;
    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();

    @Mock
    private SourceTask sourceTask;

    @Mock
    private TopicAdmin admin;

    @Mock
    private KafkaProducer<byte[], byte[]> producer;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SourceRecord> transformationChain;

    @Mock
    private CloseableOffsetStorageReader offsetReader;

    @Mock
    private OffsetStorageWriter offsetWriter;

    @Mock
    private ConnectorOffsetBackingStore offsetStore;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private WorkerSourceTaskContext sourceTaskContext;

    @Mock
    private TaskStatus.Listener statusListener;
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
    private Plugins plugins;
    private WorkerConfig config;
    private SourceConnectorConfig sourceConfig;
    private MockConnectMetrics metrics;

    @Mock
    private ErrorHandlingMetrics errorHandlingMetrics;
    private AbstractWorkerSourceTask workerTask;

    @Before
    public void setup() {
        Map<String, String> workerProps = workerProps();
        this.plugins = new Plugins(workerProps);
        this.config = new StandaloneConfig(workerProps);
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorPropsWithGroups(), true);
        this.metrics = new MockConnectMetrics();
    }

    private Map<String, String> workerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("topic.creation.enable", "true");
        return hashMap;
    }

    private Map<String, String> sourceConnectorPropsWithGroups() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "foo-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topic", "topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.groups", String.join(",", "foo", "bar"));
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("topic.creation.foo.include", "topic");
        hashMap.put("topic.creation.bar.include", ".*");
        hashMap.put("topic.creation.bar.exclude", "topic");
        return hashMap;
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
        Mockito.verifyNoMoreInteractions(new Object[]{this.statusListener});
    }

    @Test
    public void testMetricsGroup() {
        AbstractWorkerSourceTask.SourceTaskMetricsGroup sourceTaskMetricsGroup = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(this.taskId, this.metrics);
        AbstractWorkerSourceTask.SourceTaskMetricsGroup sourceTaskMetricsGroup2 = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(this.taskId1, this.metrics);
        for (int i = 0; i != 10; i++) {
            sourceTaskMetricsGroup.recordPoll(100, 1000 + (i * 100));
            sourceTaskMetricsGroup.recordWrite(10, 2);
        }
        for (int i2 = 0; i2 != 20; i2++) {
            sourceTaskMetricsGroup2.recordPoll(100, 1000 + (i2 * 100));
            sourceTaskMetricsGroup2.recordWrite(10, 4);
        }
        Assert.assertEquals(1900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
        Assert.assertEquals(1450.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
        Assert.assertEquals(33.333d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-poll-rate"), 0.001d);
        Assert.assertEquals(1000.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-poll-total"), 0.001d);
        Assert.assertEquals(2.666d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-write-rate"), 0.001d);
        Assert.assertEquals(80.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-write-total"), 0.001d);
        Assert.assertEquals(900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-active-count"), 0.001d);
        sourceTaskMetricsGroup.close();
        Iterator it = sourceTaskMetricsGroup.metricGroup().metrics().metrics().keySet().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(sourceTaskMetricsGroup.metricGroup().groupId().includes((MetricName) it.next()));
        }
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-read"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-send"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-active-count"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("partition-count"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-seq-number"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-commit-completion"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("put-batch-time"));
        Assert.assertEquals(2900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
        Assert.assertEquals(1950.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
        Assert.assertEquals(66.667d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-poll-rate"), 0.001d);
        Assert.assertEquals(2000.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-poll-total"), 0.001d);
        Assert.assertEquals(4.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-write-rate"), 0.001d);
        Assert.assertEquals(120.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-write-total"), 0.001d);
        Assert.assertEquals(1800.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-active-count"), 0.001d);
    }

    @Test
    public void testSendRecordsConvertsData() {
        createWorkerTask();
        List singletonList = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
        expectSendRecord(emptyHeaders());
        expectTopicCreation("topic");
        this.workerTask.toSend = singletonList;
        this.workerTask.sendRecords();
        ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord = verifySendRecord();
        Assert.assertArrayEquals(SERIALIZED_KEY, (byte[]) ((ProducerRecord) verifySendRecord.getValue()).key());
        Assert.assertArrayEquals(SERIALIZED_RECORD, (byte[]) ((ProducerRecord) verifySendRecord.getValue()).value());
        verifyTaskGetTopic();
        verifyTopicCreation();
    }

    @Test
    public void testSendRecordsPropagatesTimestamp() {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        createWorkerTask();
        expectSendRecord(emptyHeaders());
        expectTopicCreation("topic");
        this.workerTask.toSend = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, valueOf));
        this.workerTask.sendRecords();
        Assert.assertEquals(valueOf, ((ProducerRecord) verifySendRecord().getValue()).timestamp());
        verifyTaskGetTopic();
        verifyTopicCreation();
    }

    @Test
    public void testSendRecordsCorruptTimestamp() {
        createWorkerTask();
        expectSendRecord(emptyHeaders());
        this.workerTask.toSend = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, -3L));
        AbstractWorkerSourceTask abstractWorkerSourceTask = this.workerTask;
        abstractWorkerSourceTask.getClass();
        Assert.assertThrows(InvalidRecordException.class, abstractWorkerSourceTask::sendRecords);
        Mockito.verifyNoInteractions(new Object[]{this.producer});
        Mockito.verifyNoInteractions(new Object[]{this.admin});
    }

    @Test
    public void testSendRecordsNoTimestamp() {
        createWorkerTask();
        expectSendRecord(emptyHeaders());
        expectTopicCreation("topic");
        this.workerTask.toSend = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, -1L));
        this.workerTask.sendRecords();
        Assert.assertNull(((ProducerRecord) verifySendRecord().getValue()).timestamp());
        verifyTaskGetTopic();
        verifyTopicCreation();
    }

    @Test
    public void testHeaders() {
        Headers add = new RecordHeaders().add("header_key", "header_value".getBytes());
        org.apache.kafka.connect.header.Headers add2 = new ConnectHeaders().add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
        createWorkerTask();
        expectSendRecord(add);
        expectTopicCreation("topic");
        this.workerTask.toSend = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, (Long) null, add2));
        this.workerTask.sendRecords();
        ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord = verifySendRecord();
        Assert.assertArrayEquals(SERIALIZED_KEY, (byte[]) ((ProducerRecord) verifySendRecord.getValue()).key());
        Assert.assertArrayEquals(SERIALIZED_RECORD, (byte[]) ((ProducerRecord) verifySendRecord.getValue()).value());
        Assert.assertEquals(add, ((ProducerRecord) verifySendRecord.getValue()).headers());
        verifyTaskGetTopic();
        verifyTopicCreation();
    }

    @Test
    public void testHeadersWithCustomConverter() throws Exception {
        StringConverter stringConverter = new StringConverter();
        createWorkerTask(stringConverter, new SampleConverterWithHeaders(), stringConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList);
        expectSendRecord(null);
        expectTopicCreation("topic");
        this.workerTask.toSend = Arrays.asList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, "Árvíztűrő tükörfúrógép", (Long) null, new ConnectHeaders().addString("encoding", "latin2")), new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, "Тестовое сообщение", (Long) null, new ConnectHeaders().addString("encoding", "koi8_r")));
        this.workerTask.sendRecords();
        List allValues = verifySendRecord(2).getAllValues();
        Assert.assertEquals(2L, allValues.size());
        ProducerRecord producerRecord = (ProducerRecord) allValues.get(0);
        ProducerRecord producerRecord2 = (ProducerRecord) allValues.get(1);
        Assert.assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap((byte[]) producerRecord.key()));
        Assert.assertEquals(ByteBuffer.wrap("Árvíztűrő tükörfúrógép".getBytes("latin2")), ByteBuffer.wrap((byte[]) producerRecord.value()));
        Assert.assertEquals("latin2", new String(producerRecord.headers().lastHeader("encoding").value()));
        Assert.assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap((byte[]) producerRecord2.key()));
        Assert.assertEquals(ByteBuffer.wrap("Тестовое сообщение".getBytes("koi8_r")), ByteBuffer.wrap((byte[]) producerRecord2.value()));
        Assert.assertEquals("koi8_r", new String(producerRecord2.headers().lastHeader("encoding").value()));
        verifyTaskGetTopic(2);
        verifyTopicCreation();
    }

    @Test
    public void testTopicCreateWhenTopicExists() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.singletonMap("topic", new TopicDescription("topic", false, Collections.singletonList(new TopicPartitionInfo(0, (Node) null, Collections.emptyList(), Collections.emptyList())))));
        expectSendRecord(emptyHeaders());
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        verifySendRecord(2);
        ((TopicAdmin) Mockito.verify(this.admin, Mockito.never())).createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)});
        Mockito.verifyNoMoreInteractions(new Object[]{this.admin});
    }

    @Test
    public void testSendRecordsTopicDescribeRetries() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenThrow(new Throwable[]{new RetriableException(new TimeoutException("timeout"))}).thenReturn(Collections.emptyMap());
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        Assert.assertEquals(Arrays.asList(sourceRecord, sourceRecord2), this.workerTask.toSend);
        ((TopicAdmin) Mockito.verify(this.admin, Mockito.never())).createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)});
        Mockito.verifyNoMoreInteractions(new Object[]{this.admin});
        expectTopicCreation("topic");
        this.workerTask.sendRecords();
        Assert.assertNull(this.workerTask.toSend);
        verifyTopicCreation();
    }

    @Test
    public void testSendRecordsTopicCreateRetries() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenThrow(new Throwable[]{new RetriableException(new TimeoutException("timeout"))}).thenReturn(createdTopic("topic"));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        Assert.assertEquals(Arrays.asList(sourceRecord, sourceRecord2), this.workerTask.toSend);
        this.workerTask.sendRecords();
        Assert.assertNull(this.workerTask.toSend);
        verifyTopicCreation(2, "topic", "topic");
    }

    @Test
    public void testSendRecordsTopicDescribeRetriesMidway() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        expectPreliminaryCalls(OTHER_TOPIC);
        Mockito.when(this.admin.describeTopics(new String[]{ArgumentMatchers.anyString()})).thenReturn(Collections.emptyMap()).thenThrow(new Throwable[]{new RetriableException(new TimeoutException("timeout"))}).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenAnswer(invocationOnMock -> {
            return createdTopic(((NewTopic) invocationOnMock.getArgument(0)).name());
        });
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        this.workerTask.sendRecords();
        Assert.assertEquals(Collections.singletonList(sourceRecord3), this.workerTask.toSend);
        this.workerTask.sendRecords();
        Assert.assertNull(this.workerTask.toSend);
        ((TopicAdmin) Mockito.verify(this.admin, Mockito.times(3))).describeTopics(new String[]{ArgumentMatchers.anyString()});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(NewTopic.class);
        ((TopicAdmin) Mockito.verify(this.admin, Mockito.times(2))).createOrFindTopics(new NewTopic[]{(NewTopic) forClass.capture()});
        Assert.assertEquals(Arrays.asList("topic", OTHER_TOPIC), forClass.getAllValues().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testSendRecordsTopicCreateRetriesMidway() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        expectPreliminaryCalls(OTHER_TOPIC);
        Mockito.when(this.admin.describeTopics(new String[]{ArgumentMatchers.anyString()})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(createdTopic("topic")).thenThrow(new Throwable[]{new RetriableException(new TimeoutException("timeout"))}).thenReturn(createdTopic(OTHER_TOPIC));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        this.workerTask.sendRecords();
        Assert.assertEquals(Collections.singletonList(sourceRecord3), this.workerTask.toSend);
        verifyTopicCreation(2, "topic", OTHER_TOPIC);
        this.workerTask.sendRecords();
        Assert.assertNull(this.workerTask.toSend);
        verifyTopicCreation(3, "topic", OTHER_TOPIC, OTHER_TOPIC);
    }

    @Test
    public void testTopicDescribeFails() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenThrow(new Throwable[]{new ConnectException(new TopicAuthorizationException("unauthorized"))});
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        AbstractWorkerSourceTask abstractWorkerSourceTask = this.workerTask;
        abstractWorkerSourceTask.getClass();
        Assert.assertThrows(ConnectException.class, abstractWorkerSourceTask::sendRecords);
    }

    @Test
    public void testTopicCreateFails() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenThrow(new Throwable[]{new ConnectException(new TopicAuthorizationException("unauthorized"))});
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        AbstractWorkerSourceTask abstractWorkerSourceTask = this.workerTask;
        abstractWorkerSourceTask.getClass();
        Assert.assertThrows(ConnectException.class, abstractWorkerSourceTask::sendRecords);
        ((TopicAdmin) Mockito.verify(this.admin)).createOrFindTopics((NewTopic[]) ArgumentMatchers.any());
        verifyTopicCreation();
    }

    @Test
    public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls("topic");
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(TopicAdmin.EMPTY_CREATION);
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        AbstractWorkerSourceTask abstractWorkerSourceTask = this.workerTask;
        abstractWorkerSourceTask.getClass();
        Assert.assertThrows(ConnectException.class, abstractWorkerSourceTask::sendRecords);
        ((TopicAdmin) Mockito.verify(this.admin)).createOrFindTopics((NewTopic[]) ArgumentMatchers.any());
        verifyTopicCreation();
    }

    @Test
    public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectSendRecord(emptyHeaders());
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(foundTopic("topic"));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        Assert.assertEquals(2L, verifySendRecord(2).getAllValues().size());
        verifyTaskGetTopic(2);
        verifyTopicCreation();
    }

    @Test
    public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectSendRecord(emptyHeaders());
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.emptyMap());
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(createdTopic("topic"));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        Assert.assertEquals(2L, verifySendRecord(2).getAllValues().size());
        verifyTaskGetTopic(2);
        verifyTopicCreation();
    }

    @Test
    public void testSendRecordsRetriableException() {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectConvertHeadersAndKeyValue(emptyHeaders(), "topic");
        expectTaskGetTopic();
        Mockito.when(this.transformationChain.apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord))).thenReturn((Object) null);
        Mockito.when(this.transformationChain.apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord2))).thenReturn((Object) null);
        Mockito.when(this.transformationChain.apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord3))).thenReturn(sourceRecord3);
        Mockito.when(this.admin.describeTopics(new String[]{"topic"})).thenReturn(Collections.singletonMap("topic", new TopicDescription("topic", false, Collections.singletonList(new TopicPartitionInfo(0, (Node) null, Collections.emptyList(), Collections.emptyList())))));
        Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RetriableException("Retriable exception")}).thenReturn((Object) null);
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        Assert.assertFalse(this.workerTask.sendRecords());
        Assert.assertTrue(this.workerTask.sendRecords());
        ((TransformationChain) Mockito.verify(this.transformationChain, Mockito.times(1))).apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord));
        ((TransformationChain) Mockito.verify(this.transformationChain, Mockito.times(1))).apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord2));
        ((TransformationChain) Mockito.verify(this.transformationChain, Mockito.times(2))).apply((ConnectRecord) ArgumentMatchers.eq(sourceRecord3));
    }

    @Test
    public void testErrorReportersConfigured() {
        RetryWithToleranceOperator retryWithToleranceOperator = (RetryWithToleranceOperator) Mockito.mock(RetryWithToleranceOperator.class);
        List singletonList = Collections.singletonList(Mockito.mock(ErrorReporter.class));
        createWorkerTask(this.keyConverter, this.valueConverter, this.headerConverter, retryWithToleranceOperator, () -> {
            return singletonList;
        });
        this.workerTask.initializeAndStart();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        ((RetryWithToleranceOperator) Mockito.verify(retryWithToleranceOperator)).reporters((List) forClass.capture());
        Assert.assertEquals(singletonList, forClass.getValue());
    }

    @Test
    public void testErrorReporterConfigurationExceptionPropagation() {
        createWorkerTask(this.keyConverter, this.valueConverter, this.headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, () -> {
            throw new ConnectException("Failed to create error reporters");
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.workerTask.initializeAndStart();
        });
    }

    private void expectSendRecord(Headers headers) {
        if (headers != null) {
            expectConvertHeadersAndKeyValue(headers, "topic");
        }
        expectApplyTransformationChain();
        expectTaskGetTopic();
    }

    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
        return verifySendRecord(1);
    }

    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int i) {
        ArgumentCaptor<ProducerRecord<byte[], byte[]>> forClass = ArgumentCaptor.forClass(ProducerRecord.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((KafkaProducer) Mockito.verify(this.producer, Mockito.times(i))).send((ProducerRecord) forClass.capture(), (Callback) forClass2.capture());
        Iterator it = forClass2.getAllValues().iterator();
        while (it.hasNext()) {
            ((Callback) it.next()).onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), (Exception) null);
        }
        return forClass;
    }

    private void expectTaskGetTopic() {
        Mockito.when(this.statusBackingStore.getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            return new TopicStatus((String) invocationOnMock.getArgument(1, String.class), new ConnectorTaskId((String) invocationOnMock.getArgument(0, String.class), 0), Time.SYSTEM.milliseconds());
        });
    }

    private void verifyTaskGetTopic() {
        verifyTaskGetTopic(1);
    }

    private void verifyTaskGetTopic(int i) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore, Mockito.times(i))).getTopic((String) forClass.capture(), (String) forClass2.capture());
        Assert.assertEquals("job", forClass.getValue());
        Assert.assertEquals("topic", forClass2.getValue());
    }

    private void expectTopicCreation(String str) {
        Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(createdTopic(str));
    }

    private void verifyTopicCreation() {
        verifyTopicCreation(1, "topic");
    }

    private void verifyTopicCreation(int i, String... strArr) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(NewTopic.class);
        ((TopicAdmin) Mockito.verify(this.admin, Mockito.times(i))).createOrFindTopics(new NewTopic[]{(NewTopic) forClass.capture()});
        Assert.assertArrayEquals(strArr, forClass.getAllValues().stream().map((v0) -> {
            return v0.name();
        }).toArray(i2 -> {
            return new String[i2];
        }));
    }

    private TopicAdmin.TopicCreationResponse createdTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.singleton(str), Collections.emptySet());
    }

    private TopicAdmin.TopicCreationResponse foundTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.emptySet(), Collections.singleton(str));
    }

    private void expectPreliminaryCalls(String str) {
        expectConvertHeadersAndKeyValue(emptyHeaders(), str);
        expectApplyTransformationChain();
    }

    private void expectConvertHeadersAndKeyValue(Headers headers, String str) {
        if (headers.iterator().hasNext()) {
            Mockito.when(this.headerConverter.fromConnectHeader(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), (Schema) ArgumentMatchers.eq(Schema.STRING_SCHEMA), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
                return ((String) invocationOnMock.getArgument(3, String.class)).getBytes(StandardCharsets.UTF_8);
            });
        }
        Mockito.when(this.keyConverter.fromConnectData((String) ArgumentMatchers.eq(str), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(KEY_SCHEMA), ArgumentMatchers.eq(KEY))).thenReturn(SERIALIZED_KEY);
        Mockito.when(this.valueConverter.fromConnectData((String) ArgumentMatchers.eq(str), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(RECORD_SCHEMA), ArgumentMatchers.eq(RECORD))).thenReturn(SERIALIZED_RECORD);
    }

    private void expectApplyTransformationChain() {
        Mockito.when(this.transformationChain.apply((ConnectRecord) ArgumentMatchers.any(SourceRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg());
    }

    private RecordHeaders emptyHeaders() {
        return new RecordHeaders();
    }

    private void createWorkerTask() {
        createWorkerTask(this.keyConverter, this.valueConverter, this.headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList);
    }

    private void createWorkerTask(Converter converter, Converter converter2, HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator, Supplier<List<ErrorReporter>> supplier) {
        this.workerTask = new AbstractWorkerSourceTask(this.taskId, this.sourceTask, this.statusListener, TargetState.STARTED, converter, converter2, headerConverter, this.transformationChain, this.sourceTaskContext, this.producer, this.admin, TopicCreationGroup.configuredGroups(this.sourceConfig), this.offsetReader, this.offsetWriter, this.offsetStore, this.config, this.metrics, this.errorHandlingMetrics, this.plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, this.statusBackingStore, (v0) -> {
            v0.run();
        }, supplier) { // from class: org.apache.kafka.connect.runtime.AbstractWorkerSourceTaskTest.1
            protected void prepareToInitializeTask() {
            }

            protected void prepareToEnterSendLoop() {
            }

            protected void beginSendIteration() {
            }

            protected void prepareToPollTask() {
            }

            protected void recordDropped(SourceRecord sourceRecord) {
            }

            protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
                return Optional.empty();
            }

            protected void recordDispatched(SourceRecord sourceRecord) {
            }

            protected void batchDispatched() {
            }

            protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
            }

            protected void producerSendFailed(boolean z, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord sourceRecord, Exception exc) {
            }

            protected void finalOffsetCommit(boolean z) {
            }
        };
    }
}
