package io.confluent.kafkarest.unit;

import io.confluent.kafkarest.BinaryConsumerState;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.mock.MockConsumerConnector;
import io.confluent.kafkarest.mock.MockTime;
import io.confluent.rest.RestConfigException;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;

/* loaded from: input_file:io/confluent/kafkarest/unit/ConsumerManagerTest.class */
public class ConsumerManagerTest {
    private KafkaRestConfig config;
    private MetadataObserver mdObserver;
    private ConsumerManager.ConsumerFactory consumerFactory;
    private ConsumerManager consumerManager;
    private static final String groupName = "testgroup";
    private static final String topicName = "testtopic";
    private static final String secondTopicName = "testtopic2";
    private boolean sawCallback = false;
    private Capture<ConsumerConfig> capturedConsumerConfig;

    @Before
    public void setUp() throws RestConfigException {
        Properties properties = new Properties();
        properties.setProperty("consumer.request.max.bytes", "1024");
        properties.setProperty("exclude.internal.topics", "false");
        this.config = new KafkaRestConfig(properties, new MockTime());
        this.mdObserver = (MetadataObserver) EasyMock.createMock(MetadataObserver.class);
        this.consumerFactory = (ConsumerManager.ConsumerFactory) EasyMock.createMock(ConsumerManager.ConsumerFactory.class);
        this.consumerManager = new ConsumerManager(this.config, this.mdObserver, this.consumerFactory);
    }

    private ConsumerConnector expectCreate(Map<String, List<Map<Integer, List<ConsumerRecord<byte[], byte[]>>>>> map) {
        return expectCreate(map, false, null);
    }

    private ConsumerConnector expectCreate(Map<String, List<Map<Integer, List<ConsumerRecord<byte[], byte[]>>>>> map, boolean z, String str) {
        MockConsumerConnector mockConsumerConnector = new MockConsumerConnector(this.config.getTime(), "testclient", map, Integer.parseInt("1"), z);
        this.capturedConsumerConfig = new Capture<>();
        EasyMock.expect(this.consumerFactory.createConsumer((ConsumerConfig) EasyMock.capture(this.capturedConsumerConfig))).andReturn(mockConsumerConnector);
        return mockConsumerConnector;
    }

    private ConsumerConnector expectCreateNoData(String str) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        return expectCreate(hashMap2, true, str);
    }

    private ConsumerConnector expectCreateNoData() {
        return expectCreateNoData(null);
    }

    @Test
    public void testConsumerOverrides() {
        MockConsumerConnector mockConsumerConnector = new MockConsumerConnector(this.config.getTime(), "testclient", null, Integer.parseInt("1"), true);
        Capture capture = new Capture();
        EasyMock.expect(this.consumerFactory.createConsumer((ConsumerConfig) EasyMock.capture(capture))).andReturn(mockConsumerConnector);
        EasyMock.replay(new Object[]{this.consumerFactory});
        this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        Assert.assertFalse(((ConsumerConfig) capture.getValue()).excludeInternalTopics());
        EasyMock.verify(new Object[]{this.consumerFactory});
    }

    @Test
    public void testConsumerNormalOps() throws InterruptedException, ExecutionException {
        final List asList = Arrays.asList(new BinaryConsumerRecord("k1".getBytes(), "v1".getBytes(), 0, 0L), new BinaryConsumerRecord("k2".getBytes(), "v2".getBytes(), 1, 0L), new BinaryConsumerRecord("k3".getBytes(), "v3".getBytes(), 2, 0L));
        HashMap hashMap = new HashMap();
        hashMap.put(50, asList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.1
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertEquals(asList, list);
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        Assert.assertEquals(this.config.getInt("consumer.request.timeout.ms") + this.config.getInt("consumer.iterator.timeout.ms"), this.config.getTime().milliseconds());
        this.sawCallback = false;
        this.consumerManager.commitOffsets(groupName, createConsumer, new ConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.2
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertNotNull(list);
                Assert.assertEquals(3L, list.size());
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testConsumerMaxBytesResponse() throws InterruptedException, ExecutionException {
        List asList = Arrays.asList(new BinaryConsumerRecord((byte[]) null, new byte[512], 0, 0L), new BinaryConsumerRecord((byte[]) null, new byte[512], 1, 0L), new BinaryConsumerRecord((byte[]) null, new byte[512], 2, 0L), new BinaryConsumerRecord((byte[]) null, new byte[512], 3, 0L));
        HashMap hashMap = new HashMap();
        hashMap.put(50, asList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.3
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertEquals(2L, list.size());
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, 512L, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.4
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertEquals(1L, list.size());
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testIDOverridesName() {
        expectCreateNoData("id");
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        Assert.assertEquals("id", this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig("id", "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null)));
        Assert.assertEquals("id", ((ConsumerConfig) this.capturedConsumerConfig.getValue()).consumerId().getOrElse((Function0) null));
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testDuplicateConsumerName() {
        expectCreateNoData();
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig((String) null, "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null));
        try {
            this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig((String) null, "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null));
            Assert.fail("Expected to see exception because consumer already exists");
        } catch (RestException e) {
            Assert.assertEquals(40902L, e.getErrorCode());
        }
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testMultipleTopicSubscriptionsFail() throws InterruptedException, ExecutionException {
        expectCreateNoData();
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(secondTopicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.5
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertEquals(0L, list.size());
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, secondTopicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.6
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNotNull(exc);
                Assert.assertTrue(exc instanceof RestException);
                Assert.assertEquals(40901L, ((RestException) exc).getErrorCode());
                Assert.assertNull(list);
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testReadInvalidInstanceFails() {
        readAndExpectImmediateNotFound("invalid", topicName);
    }

    @Test
    public void testReadInvalidTopicFails() throws InterruptedException, ExecutionException {
        expectCreate(null);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists("invalidtopic"))).andReturn(false);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        readAndExpectImmediateNotFound(this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY)), "invalidtopic");
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test(expected = RestNotFoundException.class)
    public void testDeleteInvalidConsumer() {
        this.consumerManager.deleteConsumer(groupName, "invalidinstance");
    }

    @Test
    public void testConsumerExceptions() throws InterruptedException, ExecutionException {
        final List asList = Arrays.asList(new BinaryConsumerRecord("k1".getBytes(), "v1".getBytes(), 0, 0L), null, new BinaryConsumerRecord("k2".getBytes(), "v2".getBytes(), 1, 0L), new BinaryConsumerRecord("k3".getBytes(), "v3".getBytes(), 2, 0L));
        HashMap hashMap = new HashMap();
        hashMap.put(50, asList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true).times(2);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.7
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(list);
                Assert.assertNotNull(exc);
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        this.sawCallback = false;
        this.consumerManager.readTopic(groupName, createConsumer, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.8
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(exc);
                Assert.assertEquals(asList, list);
            }
        }).get();
        Assert.assertTrue(this.sawCallback);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    private void readAndExpectImmediateNotFound(String str, String str2) {
        this.sawCallback = false;
        Future readTopic = this.consumerManager.readTopic(groupName, str, str2, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerManager.ReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.unit.ConsumerManagerTest.9
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(list);
                Assert.assertThat(exc, CoreMatchers.instanceOf(RestNotFoundException.class));
            }
        });
        Assert.assertTrue(this.sawCallback);
        Assert.assertNull(readTopic);
    }
}
