package io.confluent.kafkarest;

import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.mock.MockConsumerConnector;
import io.confluent.rest.RestConfigException;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;

/* loaded from: input_file:io/confluent/kafkarest/ConsumerManagerTest.class */
public class ConsumerManagerTest {
    private Properties properties;
    private KafkaRestConfig config;
    private MetadataObserver mdObserver;
    private ConsumerManager.ConsumerFactory consumerFactory;
    private ConsumerManager consumerManager;
    private static final String groupName = "testgroup";
    private static final String topicName = "testtopic";
    private static final String secondTopicName = "testtopic2";
    private boolean sawCallback = false;
    private int actualLength = 0;
    private Capture<ConsumerConfig> capturedConsumerConfig;
    private static Exception actualException = null;
    private static List<? extends ConsumerRecord<byte[], byte[]>> actualRecords = null;
    private static List<TopicPartitionOffset> actualOffsets = null;

    @Before
    public void setUp() throws RestConfigException {
        this.properties = new Properties();
        this.properties.setProperty("consumer.request.max.bytes", "1024");
        this.properties.setProperty("exclude.internal.topics", "false");
        setUp(this.properties);
    }

    public void setUp(Properties properties) throws RestConfigException {
        this.config = new KafkaRestConfig(properties, new SystemTime());
        this.mdObserver = (MetadataObserver) EasyMock.createMock(MetadataObserver.class);
        this.consumerFactory = (ConsumerManager.ConsumerFactory) EasyMock.createMock(ConsumerManager.ConsumerFactory.class);
        this.consumerManager = new ConsumerManager(this.config, this.mdObserver, this.consumerFactory);
    }

    @After
    public void tearDown() {
        this.consumerManager.shutdown();
    }

    private ConsumerConnector expectCreate(Map<String, List<Map<Integer, List<ConsumerRecord<byte[], byte[]>>>>> map) {
        return expectCreate(map, false, null);
    }

    private ConsumerConnector expectCreate(Map<String, List<Map<Integer, List<ConsumerRecord<byte[], byte[]>>>>> map, boolean z, String str) {
        MockConsumerConnector mockConsumerConnector = new MockConsumerConnector(this.config.getTime(), "testclient", map, Integer.parseInt("1"), z);
        this.capturedConsumerConfig = new Capture<>();
        EasyMock.expect(this.consumerFactory.createConsumer((ConsumerConfig) EasyMock.capture(this.capturedConsumerConfig))).andReturn(mockConsumerConnector);
        return mockConsumerConnector;
    }

    private ConsumerConnector expectCreateNoData(String str) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        return expectCreate(hashMap2, true, str);
    }

    private ConsumerConnector expectCreateNoData() {
        return expectCreateNoData(null);
    }

    @Test
    public void testConsumerOverrides() {
        MockConsumerConnector mockConsumerConnector = new MockConsumerConnector(this.config.getTime(), "testclient", null, Integer.parseInt("1"), true);
        Capture capture = new Capture();
        EasyMock.expect(this.consumerFactory.createConsumer((ConsumerConfig) EasyMock.capture(capture))).andReturn(mockConsumerConnector);
        EasyMock.replay(new Object[]{this.consumerFactory});
        this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        Assert.assertFalse(((ConsumerConfig) capture.getValue()).excludeInternalTopics());
        EasyMock.verify(new Object[]{this.consumerFactory});
    }

    @Test
    public void testConsumerNormalOps() throws Exception {
        List<ConsumerRecord<byte[], byte[]>> referenceRecords = referenceRecords(3);
        HashMap hashMap = new HashMap();
        hashMap.put(50, referenceRecords);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        readFromDefault(createConsumer);
        verifyRead(referenceRecords, null);
        this.sawCallback = false;
        actualException = null;
        actualOffsets = null;
        this.consumerManager.commitOffsets(groupName, createConsumer, new ConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.ConsumerManagerTest.1
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Exception unused = ConsumerManagerTest.actualException = exc;
                List unused2 = ConsumerManagerTest.actualOffsets = list;
            }
        }).get();
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception", actualException);
        Assert.assertNotNull("Callback Offsets", actualOffsets);
        Assert.assertEquals("Callback Offsets Size", 3L, actualOffsets.size());
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testConsumerRequestTimeoutMs() throws Exception {
        Integer num = 400;
        this.properties.setProperty("consumer.request.timeout.ms", num.toString());
        setUp(this.properties);
        HashMap hashMap = new HashMap();
        hashMap.put(topicName, Arrays.asList(new HashMap()));
        expectCreate(hashMap);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        long currentTimeMillis = System.currentTimeMillis();
        readFromDefault(this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY)));
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis > ((long) num.intValue()));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
    }

    @Test
    public void testConsumerTimeoutMsMsAndMinBytes() throws Exception {
        this.properties.setProperty("consumer.request.timeout.ms", "1303");
        this.properties.setProperty("fetch.min.bytes", "1");
        setUp(this.properties);
        List<ConsumerRecord<byte[], byte[]>> referenceRecords = referenceRecords(3);
        HashMap hashMap = new HashMap();
        hashMap.put(50, referenceRecords);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        long currentTimeMillis = System.currentTimeMillis();
        readFromDefault(createConsumer);
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", Arrays.asList(referenceRecords.get(0)), actualRecords);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < ((long) this.config.getInt("consumer.request.timeout.ms")));
    }

    @Test
    public void testConsumeMinBytesIsOverridablePerConsumer() throws Exception {
        this.properties.setProperty("fetch.min.bytes", "10");
        setUp(this.properties);
        List<ConsumerRecord<byte[], byte[]>> referenceRecords = referenceRecords(3);
        HashMap hashMap = new HashMap();
        hashMap.put(50, referenceRecords);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        ConsumerInstanceConfig consumerInstanceConfig = new ConsumerInstanceConfig(EmbeddedFormat.BINARY);
        consumerInstanceConfig.setResponseMinBytes(1);
        readFromDefault(this.consumerManager.createConsumer(groupName, consumerInstanceConfig));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", Arrays.asList(referenceRecords.get(0)), actualRecords);
    }

    @Test
    public void testConsumerRequestTimeoutMsIsOverriddablePerConsumer() throws Exception {
        Integer num = 111;
        Integer num2 = 1201;
        this.properties.setProperty("consumer.request.timeout.ms", num2.toString());
        setUp(this.properties);
        HashMap hashMap = new HashMap();
        hashMap.put(topicName, Arrays.asList(new HashMap()));
        expectCreate(hashMap);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig((String) null, (String) null, EmbeddedFormat.BINARY.name(), (String) null, (String) null, (Integer) null, num));
        long currentTimeMillis = System.currentTimeMillis();
        readFromDefault(createConsumer);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(currentTimeMillis2 < ((long) num2.intValue()));
        Assert.assertTrue(currentTimeMillis2 > ((long) num.intValue()));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
    }

    @Test
    public void testConsumerMaxBytesResponse() throws Exception {
        List asList = Arrays.asList(new BinaryConsumerRecord(topicName, (byte[]) null, new byte[511], 0, 0L), new BinaryConsumerRecord(topicName, (byte[]) null, new byte[511], 1, 0L), new BinaryConsumerRecord(topicName, (byte[]) null, new byte[511], 2, 0L), new BinaryConsumerRecord(topicName, (byte[]) null, new byte[511], 3, 0L));
        HashMap hashMap = new HashMap();
        hashMap.put(50, asList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        actualException = null;
        this.actualLength = 0;
        readTopic(createConsumer, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.2
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                Exception unused = ConsumerManagerTest.actualException = restException;
                ConsumerManagerTest.this.actualLength = list.size();
            }
        });
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("Callback received exception", actualException);
        Assert.assertEquals("List of records returned incorrect", 2L, this.actualLength);
        this.sawCallback = false;
        actualException = null;
        this.actualLength = 0;
        readTopic(createConsumer, 512L, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.3
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                Exception unused = ConsumerManagerTest.actualException = restException;
                ConsumerManagerTest.this.actualLength = list.size();
            }
        });
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("Callback received exception", actualException);
        Assert.assertEquals("List of records returned incorrect", 1L, this.actualLength);
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testIDOverridesName() {
        expectCreateNoData("id");
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        Assert.assertEquals("id", this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig("id", "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null, (Integer) null, (Integer) null)));
        Assert.assertEquals("id", ((ConsumerConfig) this.capturedConsumerConfig.getValue()).consumerId().getOrElse((Function0) null));
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testDuplicateConsumerName() {
        expectCreateNoData();
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig((String) null, "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null, (Integer) null, (Integer) null));
        try {
            this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig((String) null, "name", EmbeddedFormat.BINARY.toString(), (String) null, (String) null, (Integer) null, (Integer) null));
            Assert.fail("Expected to see exception because consumer already exists");
        } catch (RestException e) {
            Assert.assertEquals(40902L, e.getErrorCode());
        }
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testMultipleTopicSubscriptionsFail() throws Exception {
        expectCreateNoData();
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(secondTopicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        readTopic(createConsumer);
        verifyRead(Collections.emptyList(), null);
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception", actualException);
        Assert.assertEquals("Callback records should be valid but of 0 size", 0L, actualRecords.size());
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        readTopic(createConsumer, secondTopicName);
        verifyRead(null, RestException.class);
        Assert.assertEquals("Callback Exception should be for already subscribed consumer", 40901L, actualException.getErrorCode());
        this.consumerManager.deleteConsumer(groupName, createConsumer);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test
    public void testBackoffMsControlsPollCalls() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("consumer.iterator.backoff.ms", "250");
        properties.setProperty("exclude.internal.topics", "false");
        this.config = new KafkaRestConfig(properties, new SystemTime());
        this.mdObserver = (MetadataObserver) EasyMock.createMock(MetadataObserver.class);
        this.consumerFactory = (ConsumerManager.ConsumerFactory) EasyMock.createMock(ConsumerManager.ConsumerFactory.class);
        this.consumerManager = new ConsumerManager(this.config, this.mdObserver, this.consumerFactory);
        expectCreateNoData();
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        Future readTopicFuture = readTopicFuture(this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY)), topicName, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.4
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                Exception unused = ConsumerManagerTest.actualException = restException;
            }
        });
        Thread.sleep(100L);
        Assert.assertEquals(1L, this.consumerManager.delayedReadTasks.size());
        Thread.sleep(100L);
        Assert.assertEquals(1L, this.consumerManager.delayedReadTasks.size());
        readTopicFuture.get();
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception should not be populated", actualException);
    }

    @Test
    public void testBackoffMsUpdatesReadTaskExpiry() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("consumer.iterator.backoff.ms", Integer.toString(1000));
        properties.setProperty("exclude.internal.topics", "false");
        this.config = new KafkaRestConfig(properties, new SystemTime());
        this.mdObserver = (MetadataObserver) EasyMock.createMock(MetadataObserver.class);
        this.consumerFactory = (ConsumerManager.ConsumerFactory) EasyMock.createMock(ConsumerManager.ConsumerFactory.class);
        this.consumerManager = new ConsumerManager(this.config, this.mdObserver, this.consumerFactory);
        expectCreateNoData();
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        Future readTopicFuture = readTopicFuture(this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY)), topicName, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.5
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                List unused = ConsumerManagerTest.actualRecords = list;
                Exception unused2 = ConsumerManagerTest.actualException = restException;
            }
        });
        Thread.sleep(100L);
        ConsumerManager.RunnableReadTask peek = this.consumerManager.delayedReadTasks.peek();
        if (peek == null) {
            Assert.fail("Could not get read task in time. It should not be null");
        }
        long delay = peek.getDelay(TimeUnit.MILLISECONDS);
        Assert.assertTrue(delay < ((long) 1000));
        Assert.assertTrue(((double) delay) > ((double) 1000) * 0.5d);
        readTopicFuture.get();
        verifyRead(Collections.emptyList(), null);
    }

    @Test
    public void testConsumerExpirationIsUpdated() throws Exception {
        expectCreateNoData();
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        ConsumerState consumerInstance = this.consumerManager.getConsumerInstance(groupName, createConsumer);
        long j = consumerInstance.expiration;
        Future readTopicFuture = readTopicFuture(createConsumer, topicName, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.6
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                List unused = ConsumerManagerTest.actualRecords = list;
                Exception unused2 = ConsumerManagerTest.actualException = restException;
            }
        });
        Thread.sleep(100L);
        Assert.assertTrue(consumerInstance.expiration > j);
        long j2 = consumerInstance.expiration;
        readTopicFuture.get();
        Assert.assertTrue(consumerInstance.expiration > j2);
        verifyRead(Collections.emptyList(), null);
        long j3 = consumerInstance.expiration;
        this.consumerManager.commitOffsets(groupName, createConsumer, new ConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.ConsumerManagerTest.7
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                ConsumerManagerTest.this.sawCallback = true;
                Exception unused = ConsumerManagerTest.actualException = exc;
                List unused2 = ConsumerManagerTest.actualOffsets = list;
            }
        }).get();
        Assert.assertTrue(consumerInstance.expiration > j3);
    }

    @Test
    public void testReadInvalidInstanceFails() {
        readAndExpectImmediateNotFound("invalid", topicName);
    }

    @Test
    public void testReadInvalidTopicFails() throws Exception, ExecutionException {
        expectCreate(null);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists("invalidtopic"))).andReturn(false);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        readAndExpectImmediateNotFound(this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY)), "invalidtopic");
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    @Test(expected = RestNotFoundException.class)
    public void testDeleteInvalidConsumer() {
        this.consumerManager.deleteConsumer(groupName, "invalidinstance");
    }

    @Test
    public void testConsumerExceptions() throws Exception {
        final List<ConsumerRecord<byte[], byte[]>> referenceRecords = referenceRecords(3);
        referenceRecords.add(null);
        HashMap hashMap = new HashMap();
        hashMap.put(50, referenceRecords);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicName, Arrays.asList(hashMap));
        expectCreate(hashMap2);
        EasyMock.expect(Boolean.valueOf(this.mdObserver.topicExists(topicName))).andReturn(true).times(2);
        EasyMock.replay(new Object[]{this.mdObserver, this.consumerFactory});
        String createConsumer = this.consumerManager.createConsumer(groupName, new ConsumerInstanceConfig(EmbeddedFormat.BINARY));
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        readTopic(createConsumer);
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNotNull("Callback exception should be populated", actualException);
        Assert.assertNull("Callback with exception should not have any records", actualRecords);
        this.sawCallback = false;
        readTopic(createConsumer, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.8
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                Assert.assertNull(restException);
                Assert.assertEquals(referenceRecords, list);
            }
        });
        Assert.assertTrue(this.sawCallback);
        EasyMock.verify(new Object[]{this.mdObserver, this.consumerFactory});
    }

    private void readTopic(String str) throws Exception {
        readTopic(str, topicName);
    }

    private void readTopic(String str, String str2) throws Exception {
        readTopic(str, str2, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.9
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                List unused = ConsumerManagerTest.actualRecords = list;
                Exception unused2 = ConsumerManagerTest.actualException = restException;
            }
        });
    }

    private void readTopic(String str, ConsumerReadCallback consumerReadCallback) throws Exception {
        readTopic(str, topicName, Long.MAX_VALUE, consumerReadCallback);
    }

    private void readTopic(String str, long j, ConsumerReadCallback consumerReadCallback) throws Exception {
        readTopic(str, topicName, j, consumerReadCallback);
    }

    private void readTopic(String str, String str2, long j, ConsumerReadCallback consumerReadCallback) throws Exception {
        readTopicFuture(str, str2, j, consumerReadCallback).get((long) (Integer.parseInt("1000") * 1.1d), TimeUnit.MILLISECONDS);
    }

    private Future readTopicFuture(String str, String str2, long j, ConsumerReadCallback consumerReadCallback) {
        return this.consumerManager.readTopic(groupName, str, str2, BinaryConsumerState.class, j, consumerReadCallback);
    }

    private List<ConsumerRecord<byte[], byte[]>> referenceRecords(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new BinaryConsumerRecord(topicName, ("k" + (i2 + 1)).getBytes(), ("v" + (i2 + 1)).getBytes(), i2, 0L));
        }
        return arrayList;
    }

    private void readFromDefault(String str) throws InterruptedException, ExecutionException {
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        this.consumerManager.readTopic(groupName, str, topicName, BinaryConsumerState.class, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.10
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                Exception unused = ConsumerManagerTest.actualException = restException;
                List unused2 = ConsumerManagerTest.actualRecords = list;
                ConsumerManagerTest.this.sawCallback = true;
            }
        }).get();
    }

    private void readAndExpectImmediateNotFound(String str, String str2) {
        this.sawCallback = false;
        actualRecords = null;
        actualException = null;
        Future readTopicFuture = readTopicFuture(str, str2, Long.MAX_VALUE, new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.ConsumerManagerTest.11
            public void onCompletion(List<? extends ConsumerRecord<byte[], byte[]>> list, RestException restException) {
                ConsumerManagerTest.this.sawCallback = true;
                List unused = ConsumerManagerTest.actualRecords = list;
                Exception unused2 = ConsumerManagerTest.actualException = restException;
            }
        });
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback records", actualRecords);
        Assert.assertThat("Callback exception is RestNotFound", actualException, CoreMatchers.instanceOf(RestNotFoundException.class));
        Assert.assertNull(readTopicFuture);
    }

    private void verifyRead(List<? extends ConsumerRecord<byte[], byte[]>> list, Class cls) {
        Assert.assertTrue("Callback was not called", this.sawCallback);
        if (list == null) {
            Assert.assertNull("Callback records should be null", actualRecords);
        } else {
            Assert.assertEquals("Callback records not as expected", list, actualRecords);
        }
        if (cls == null) {
            Assert.assertNull("Exception is not null", actualException);
        } else {
            Assert.assertNotNull(actualException);
            Assert.assertThat("Callback exception is not as expected", actualException, CoreMatchers.instanceOf(cls));
        }
    }
}
