package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.mock.MockTime;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.rest.RestConfigException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManagerTest.class */
public class KafkaConsumerManagerTest {
    private KafkaRestConfig config;

    @Mock
    private MetadataObserver mdObserver;

    @Mock
    private KafkaConsumerManager.KafkaConsumerFactory consumerFactory;
    private KafkaConsumerManager consumerManager;
    private static final String groupName = "testgroup";
    private static final String topicName = "testtopic";
    private boolean sawCallback = false;
    private static Exception actualException = null;
    private static List<? extends ConsumerRecord<byte[], byte[]>> actualRecords = null;
    private static List<TopicPartitionOffset> actualOffsets = null;
    private Capture<Properties> capturedConsumerConfig;
    private MockConsumer<byte[], byte[]> consumer;

    @Before
    public void setUp() throws RestConfigException {
        Properties properties = new Properties();
        properties.setProperty("consumer.request.max.bytes", "1024");
        properties.setProperty("consumer.exclude.internal.topics", "false");
        this.config = new KafkaRestConfig(properties, new MockTime());
        this.consumerManager = new KafkaConsumerManager(this.config, this.consumerFactory);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    }

    @After
    public void tearDown() {
        this.consumerManager.shutdown();
    }

    private void expectCreate() {
        this.capturedConsumerConfig = Capture.newInstance();
        EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(this.capturedConsumerConfig))).andReturn(this.consumer);
    }

    @Test
    public void testConsumerOverrides() {
        Capture newInstance = Capture.newInstance();
        EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(newInstance))).andReturn(this.consumer);
        EasyMock.replay(new Object[]{this.consumerFactory});
        this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        Assert.assertEquals("false", ((Properties) newInstance.getValue()).get("exclude.internal.topics"));
        EasyMock.verify(new Object[]{this.consumerFactory});
    }

    @Test
    public void testConsumerNormalOps() throws InterruptedException, ExecutionException {
        List asList = Arrays.asList(new BinaryConsumerRecord(topicName, "k1".getBytes(), "v1".getBytes(), 0, 0L), new BinaryConsumerRecord(topicName, "k2".getBytes(), "v2".getBytes(), 0, 1L), new BinaryConsumerRecord(topicName, "k3".getBytes(), "v3".getBytes(), 0, 2L));
        expectCreate();
        this.consumer.schedulePollTask(new Runnable() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaConsumerManagerTest.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(KafkaConsumerManagerTest.topicName, 0, 0L, "k1".getBytes(), "v1".getBytes()));
                KafkaConsumerManagerTest.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(KafkaConsumerManagerTest.topicName, 0, 1L, "k2".getBytes(), "v2".getBytes()));
                KafkaConsumerManagerTest.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(KafkaConsumerManagerTest.topicName, 0, 2L, "k3".getBytes(), "v3".getBytes()));
            }
        });
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        this.consumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        this.consumerManager.readRecords(groupName, createConsumer, BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, new KafkaConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.2
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualRecords = list;
                KafkaConsumerManagerTest.this.sawCallback = true;
            }
        }).get();
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", asList, actualRecords);
        Assert.assertEquals(this.config.getInt("consumer.request.timeout.ms"), this.config.getTime().milliseconds());
        this.sawCallback = false;
        actualException = null;
        actualOffsets = null;
        this.consumerManager.commitOffsets(groupName, createConsumer, (String) null, (ConsumerOffsetCommitRequest) null, new KafkaConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.3
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                KafkaConsumerManagerTest.this.sawCallback = true;
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualOffsets = list;
            }
        }).get();
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception", actualException);
        Assert.assertNotNull("Callback Offsets", actualOffsets);
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }
}
