package org.apache.kafka.connect.util;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@PrepareForTest({KafkaBasedLog.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/util/KafkaBasedLogTest.class */
public class KafkaBasedLogTest {
    private static final String TOPIC = "connect-log";
    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
    private static final Map<String, Object> PRODUCER_PROPS = new HashMap();
    private static final Map<String, Object> CONSUMER_PROPS;
    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT;
    private static final Map<String, String> FIRST_SET;
    private static final Node LEADER;
    private static final Node REPLICA;
    private static final PartitionInfo TPINFO0;
    private static final PartitionInfo TPINFO1;
    private static final String TP0_KEY = "TP0KEY";
    private static final String TP1_KEY = "TP1KEY";
    private static final String TP0_VALUE = "VAL0";
    private static final String TP1_VALUE = "VAL1";
    private static final String TP0_VALUE_NEW = "VAL0_NEW";
    private static final String TP1_VALUE_NEW = "VAL1_NEW";
    private KafkaBasedLog<String, String> store;

    @Mock
    private Runnable initializer;

    @Mock
    private KafkaProducer<String, String> producer;
    private MockConsumer<String, String> consumer;

    @Mock
    private TopicAdmin admin;
    private Time time = new MockTime();
    private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap();
    private Callback<ConsumerRecord<String, String>> consumedCallback = (th, consumerRecord) -> {
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        List<ConsumerRecord<String, String>> list = this.consumedRecords.get(topicPartition);
        if (list == null) {
            list = new ArrayList();
            this.consumedRecords.put(topicPartition, list);
        }
        list.add(consumerRecord);
    };

    @Before
    public void setUp() {
        this.store = (KafkaBasedLog) PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, new Object[]{TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, this.consumedCallback, this.time, this.initializer});
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        this.consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateBeginningOffsets(hashMap);
    }

    @Test
    public void testStartStop() throws Exception {
        expectStart();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateEndOffsets(hashMap);
        this.store.start();
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testReloadOnStart() throws Exception {
        expectStart();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 1L);
        hashMap.put(TP1, 1L);
        this.consumer.updateEndOffsets(hashMap);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.consumer.schedulePollTask(() -> {
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE, new RecordHeaders(), Optional.empty()));
            });
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE, new RecordHeaders(), Optional.empty()));
            });
            MockConsumer<String, String> mockConsumer = this.consumer;
            countDownLatch.getClass();
            mockConsumer.schedulePollTask(countDownLatch::countDown);
        });
        this.store.start();
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(2L, this.consumedRecords.size());
        Assert.assertEquals(TP0_VALUE, this.consumedRecords.get(TP0).get(0).value());
        Assert.assertEquals(TP1_VALUE, this.consumedRecords.get(TP1).get(0).value());
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
        expectStart();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 7L);
        hashMap.put(TP1, 7L);
        this.consumer.updateEndOffsets(hashMap);
        this.consumer.updateBeginningOffsets(hashMap);
        this.consumer.schedulePollTask(() -> {
            throw new WakeupException();
        });
        this.store.start();
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(7L, this.consumer.position(TP0));
        Assert.assertEquals(7L, this.consumer.position(TP1));
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testSendAndReadToEnd() throws Exception {
        expectStart();
        TestFuture testFuture = new TestFuture();
        ProducerRecord producerRecord = new ProducerRecord(TOPIC, TP0_KEY, TP0_VALUE);
        Capture newCapture = EasyMock.newCapture();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.eq(producerRecord), (Callback) EasyMock.capture(newCapture))).andReturn(testFuture);
        TestFuture testFuture2 = new TestFuture();
        ProducerRecord producerRecord2 = new ProducerRecord(TOPIC, TP1_KEY, TP1_VALUE);
        Capture newCapture2 = EasyMock.newCapture();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.eq(producerRecord2), (Callback) EasyMock.capture(newCapture2))).andReturn(testFuture2);
        this.producer.flush();
        PowerMock.expectLastCall();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateEndOffsets(hashMap);
        this.store.start();
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(0L, this.consumer.position(TP0));
        Assert.assertEquals(0L, this.consumer.position(TP1));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Callback callback = (recordMetadata, exc) -> {
            atomicInteger.incrementAndGet();
        };
        this.store.send(TP0_KEY, TP0_VALUE, callback);
        this.store.send(TP1_KEY, TP1_VALUE, callback);
        Assert.assertEquals(0L, atomicInteger.get());
        testFuture2.resolve((TestFuture) null);
        ((Callback) newCapture2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertEquals(1L, atomicInteger.get());
        testFuture.resolve((TestFuture) null);
        ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertEquals(2L, atomicInteger.get());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        FutureCallback futureCallback = new FutureCallback((th, r5) -> {
            atomicBoolean.set(true);
        });
        this.consumer.schedulePollTask(() -> {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(TP0, 2L);
            hashMap2.put(TP1, 2L);
            this.consumer.updateEndOffsets(hashMap2);
            this.store.readToEnd(futureCallback);
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE, new RecordHeaders(), Optional.empty()));
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW, new RecordHeaders(), Optional.empty()));
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE, new RecordHeaders(), Optional.empty()));
            });
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE_NEW, new RecordHeaders(), Optional.empty()));
            });
        });
        futureCallback.get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertEquals(2L, this.consumedRecords.size());
        Assert.assertEquals(2L, this.consumedRecords.get(TP0).size());
        Assert.assertEquals(TP0_VALUE, this.consumedRecords.get(TP0).get(0).value());
        Assert.assertEquals(TP0_VALUE_NEW, this.consumedRecords.get(TP0).get(1).value());
        Assert.assertEquals(2L, this.consumedRecords.get(TP1).size());
        Assert.assertEquals(TP1_VALUE, this.consumedRecords.get(TP1).get(0).value());
        Assert.assertEquals(TP1_VALUE_NEW, this.consumedRecords.get(TP1).get(1).value());
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testPollConsumerError() throws Exception {
        expectStart();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 1L);
        hashMap.put(TP1, 1L);
        this.consumer.updateEndOffsets(hashMap);
        this.consumer.schedulePollTask(() -> {
            this.consumer.schedulePollTask(() -> {
                this.consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
            });
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW, new RecordHeaders(), Optional.empty()));
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW, new RecordHeaders(), Optional.empty()));
            });
            MockConsumer<String, String> mockConsumer = this.consumer;
            countDownLatch.getClass();
            mockConsumer.schedulePollTask(countDownLatch::countDown);
        });
        this.store.start();
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(1L, this.consumer.position(TP0));
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
        expectStart();
        this.producer.flush();
        PowerMock.expectLastCall();
        expectStop();
        PowerMock.replayAll(new Object[0]);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateEndOffsets(hashMap);
        this.store.start();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        FutureCallback futureCallback = new FutureCallback((th, r5) -> {
            atomicBoolean.set(true);
        });
        this.consumer.schedulePollTask(() -> {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(TP0, 1L);
            hashMap2.put(TP1, 1L);
            this.consumer.updateEndOffsets(hashMap2);
            this.consumer.setOffsetsException(new TimeoutException("Failed to get offsets by times"));
            this.store.readToEnd(futureCallback);
            this.consumer.scheduleNopPollTask();
            this.consumer.scheduleNopPollTask();
            this.consumer.schedulePollTask(() -> {
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE, new RecordHeaders(), Optional.empty()));
                this.consumer.addRecord(new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW, new RecordHeaders(), Optional.empty()));
            });
            MockConsumer<String, String> mockConsumer = this.consumer;
            countDownLatch.getClass();
            mockConsumer.schedulePollTask(countDownLatch::countDown);
        });
        futureCallback.get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(1L, this.consumer.position(TP0));
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testProducerError() throws Exception {
        expectStart();
        TestFuture testFuture = new TestFuture();
        ProducerRecord producerRecord = new ProducerRecord(TOPIC, TP0_KEY, TP0_VALUE);
        Capture newCapture = EasyMock.newCapture();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.eq(producerRecord), (Callback) EasyMock.capture(newCapture))).andReturn(testFuture);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateEndOffsets(hashMap);
        this.store.start();
        Assert.assertEquals(CONSUMER_ASSIGNMENT, this.consumer.assignment());
        Assert.assertEquals(0L, this.consumer.position(TP0));
        Assert.assertEquals(0L, this.consumer.position(TP1));
        AtomicReference atomicReference = new AtomicReference();
        this.store.send(TP0_KEY, TP0_VALUE, (recordMetadata, exc) -> {
            Assert.assertNull(atomicReference.get());
            atomicReference.set(exc);
        });
        LeaderNotAvailableException leaderNotAvailableException = new LeaderNotAvailableException("Error");
        testFuture.resolve((Throwable) leaderNotAvailableException);
        ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, leaderNotAvailableException);
        Assert.assertNotNull(atomicReference.get());
        this.store.stop();
        Assert.assertFalse(((Thread) Whitebox.getInternalState(this.store, "thread")).isAlive());
        Assert.assertTrue(this.consumer.closed());
        PowerMock.verifyAll();
    }

    @Test
    public void testReadEndOffsetsUsingAdmin() throws Exception {
        setupWithAdmin();
        expectProducerAndConsumerCreate();
        HashSet hashSet = new HashSet(Arrays.asList(TP0, TP1));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.admin.endOffsets((Set) EasyMock.eq(hashSet));
        PowerMock.expectLastCall().andReturn(hashMap).times(2);
        PowerMock.replayAll(new Object[0]);
        this.store.start();
        Assert.assertEquals(hashMap, this.store.readEndOffsets(hashSet));
    }

    @Test
    public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception {
        setupWithAdmin();
        expectProducerAndConsumerCreate();
        HashSet hashSet = new HashSet(Arrays.asList(TP0, TP1));
        this.admin.endOffsets((Set) EasyMock.eq(hashSet));
        PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old"));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.consumer.updateEndOffsets(hashMap);
        PowerMock.replayAll(new Object[0]);
        this.store.start();
        Assert.assertEquals(hashMap, this.store.readEndOffsets(hashSet));
    }

    @Test
    public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception {
        setupWithAdmin();
        expectProducerAndConsumerCreate();
        HashSet hashSet = new HashSet(Arrays.asList(TP0, TP1));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0, 0L);
        hashMap.put(TP1, 0L);
        this.admin.endOffsets((Set) EasyMock.eq(hashSet));
        PowerMock.expectLastCall().andReturn(hashMap).times(1);
        this.admin.endOffsets((Set) EasyMock.eq(hashSet));
        PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry"));
        PowerMock.replayAll(new Object[0]);
        this.store.start();
        Assert.assertThrows(LeaderNotAvailableException.class, () -> {
            this.store.readEndOffsets(hashSet);
        });
    }

    private void setupWithAdmin() {
        this.store = (KafkaBasedLog) PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, new Object[]{TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, () -> {
            return this.admin;
        }, this.consumedCallback, this.time, topicAdmin -> {
        }});
    }

    private void expectProducerAndConsumerCreate() throws Exception {
        PowerMock.expectPrivate(this.store, "createProducer", new Object[0]).andReturn(this.producer);
        PowerMock.expectPrivate(this.store, "createConsumer", new Object[0]).andReturn(this.consumer);
    }

    private void expectStart() throws Exception {
        this.initializer.run();
        EasyMock.expectLastCall().times(1);
        expectProducerAndConsumerCreate();
    }

    private void expectStop() {
        this.producer.close();
        PowerMock.expectLastCall();
    }

    private static ByteBuffer buffer(String str) {
        return ByteBuffer.wrap(str.getBytes());
    }

    static {
        PRODUCER_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        PRODUCER_PROPS.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        PRODUCER_PROPS.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        CONSUMER_PROPS = new HashMap();
        CONSUMER_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        CONSUMER_PROPS.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        CONSUMER_PROPS.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        CONSUMER_ASSIGNMENT = new HashSet(Arrays.asList(TP0, TP1));
        FIRST_SET = new HashMap();
        FIRST_SET.put("key", "value");
        FIRST_SET.put(null, null);
        LEADER = new Node(1, "broker1", 9092);
        REPLICA = new Node(1, "broker2", 9093);
        TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
        TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
    }
}
