package io.confluent.kafkarest.v2;

import com.google.protobuf.ByteString;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.SystemTime;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManagerTest.class */
public class KafkaConsumerManagerTest {
    private static final String TOPIC = "topic";
    private static final int PARTITION = 1;
    private static final long BEGINNING_OFFSET = 10;
    private static final long END_OFFSET = 20;
    private static final long OFFSET_AT_TIME = 30;
    private KafkaRestConfig config;

    @Mock
    private KafkaConsumerManager.KafkaConsumerFactory consumerFactory;
    private KafkaConsumerManager consumerManager;
    private static final String groupName = "testgroup";
    private static final String topicName = "testtopic";
    private boolean sawCallback = false;
    private Capture<Properties> capturedConsumerConfig;
    private MockConsumer<byte[], byte[]> consumer;
    private static final Instant OFFSET_TIMESTAMP = Instant.ofEpochMilli(1000);
    private static Exception actualException = null;
    private static List<ConsumerRecord<ByteString, ByteString>> actualRecords = null;
    private static List<TopicPartitionOffset> actualOffsets = null;

    @Before
    public void setUp() {
        setUpConsumer(setUpProperties());
    }

    private void setUpConsumer(Properties properties) {
        this.config = new KafkaRestConfig(properties, new SystemTime());
        this.consumerManager = new KafkaConsumerManager(this.config, this.consumerFactory);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST, groupName);
    }

    private Properties setUpProperties() {
        return setUpProperties(null);
    }

    private Properties setUpProperties(Properties properties) {
        if (properties == null) {
            properties = new Properties();
        }
        properties.setProperty("bootstrap.servers", "PLAINTEXT://hostname:9092");
        properties.setProperty("consumer.request.max.bytes", "1024");
        properties.setProperty("consumer.threads", "1");
        properties.setProperty("consumer.exclude.internal.topics", "false");
        return properties;
    }

    @After
    public void tearDown() {
        this.consumerManager.shutdown();
    }

    private void expectCreate(MockConsumer mockConsumer) {
        this.capturedConsumerConfig = Capture.newInstance();
        EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(this.capturedConsumerConfig))).andStubReturn(mockConsumer);
        EasyMock.replay(new Object[]{this.consumerFactory});
    }

    @Test
    public void getBeginningOffset_returnsBeginningOffset() {
        expectCreate(this.consumer);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(TOPIC, 1), Long.valueOf(BEGINNING_OFFSET)));
        Assert.assertEquals(BEGINNING_OFFSET, this.consumerManager.getBeginningOffset(TOPIC, 1));
    }

    @Test
    public void getEndOffset_returnsEndOffset() {
        expectCreate(this.consumer);
        this.consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(TOPIC, 1), Long.valueOf(END_OFFSET)));
        Assert.assertEquals(END_OFFSET, this.consumerManager.getEndOffset(TOPIC, 1));
    }

    @Test
    public void getOffsetForTime_returnsOffset() {
        expectCreate(this.consumer);
        this.consumer.updateOffsetForTime(TOPIC, 1, OFFSET_AT_TIME, OFFSET_TIMESTAMP);
        Assert.assertEquals(Long.valueOf(OFFSET_AT_TIME), this.consumerManager.getOffsetForTime(TOPIC, 1, OFFSET_TIMESTAMP).get());
    }

    @Test
    public void testConsumerOverrides() {
        Capture newInstance = Capture.newInstance();
        EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(newInstance))).andReturn(this.consumer);
        EasyMock.replay(new Object[]{this.consumerFactory});
        this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        Assert.assertEquals("false", ((Properties) newInstance.getValue()).get("exclude.internal.topics"));
        EasyMock.verify(new Object[]{this.consumerFactory});
    }

    @Test
    public void testConsumerRequestTimeoutms() throws Exception {
        Properties upProperties = setUpProperties(new Properties());
        upProperties.setProperty("consumer.request.timeout.ms", "2500");
        setUpConsumer(upProperties);
        expectCreate(this.consumer);
        String createConsumer = this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        readFromDefault(createConsumer);
        Thread.sleep((long) (this.config.getInt("consumer.request.timeout.ms").intValue() * 0.5d));
        Assert.assertFalse("Callback failed early", this.sawCallback);
        Thread.sleep((long) (this.config.getInt("consumer.request.timeout.ms").intValue() * 0.7d));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
    }

    @Test
    public void testConsumerWaitMs() throws Exception {
        Properties upProperties = setUpProperties(new Properties());
        Integer num = 400;
        upProperties.setProperty("consumer.request.timeout.ms", num.toString());
        setUpConsumer(upProperties);
        expectCreate(this.consumer);
        schedulePoll();
        String createConsumer = this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        this.consumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        readFromDefault(createConsumer);
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < num.intValue()) {
            Assert.assertFalse(this.sawCallback);
            Thread.sleep(40L);
        }
        Thread.sleep((long) (num.intValue() * 0.5d));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
    }

    @Test
    public void testConsumerRequestTimeoutmsAndMinBytes() throws Exception {
        Properties upProperties = setUpProperties(new Properties());
        upProperties.setProperty("consumer.request.timeout.ms", "1303");
        upProperties.setProperty("fetch.min.bytes", "5");
        setUpConsumer(upProperties);
        expectCreate(this.consumer);
        String createConsumer = this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        this.consumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        long currentTimeMillis = System.currentTimeMillis();
        readFromDefault(createConsumer);
        int intValue = this.config.getInt("consumer.request.timeout.ms").intValue();
        while (System.currentTimeMillis() - currentTimeMillis < intValue) {
            Assert.assertFalse(this.sawCallback);
            Thread.sleep(100L);
        }
        Thread.sleep(200L);
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertTrue("Records returned not empty", actualRecords.isEmpty());
        this.sawCallback = false;
        List<ConsumerRecord<ByteString, ByteString>> schedulePoll = schedulePoll();
        readFromDefault(createConsumer);
        Thread.sleep(intValue / 2);
        Assert.assertEquals("Records returned not as expected", schedulePoll, actualRecords);
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception", actualException);
    }

    @Test
    public void testConsumerMinAndMaxBytes() throws Exception {
        ConsumerRecord<ByteString, ByteString> binaryConsumerRecord = binaryConsumerRecord(0);
        int size = ((ByteString) binaryConsumerRecord.getKey()).size() + ((ByteString) binaryConsumerRecord.getValue()).size();
        Properties upProperties = setUpProperties(new Properties());
        upProperties.setProperty("fetch.min.bytes", Integer.toString(size));
        upProperties.setProperty("consumer.request.max.bytes", Integer.toString(size * 10));
        setUpConsumer(upProperties);
        List<ConsumerRecord<ByteString, ByteString>> schedulePoll = schedulePoll();
        List asList = Arrays.asList(schedulePoll.get(0), schedulePoll.get(1), schedulePoll.get(2));
        schedulePoll(3);
        expectCreate(this.consumer);
        String createConsumer = this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        this.consumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        readFromDefault(createConsumer);
        Thread.sleep((long) (Integer.parseInt("1000") * 0.5d));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", asList, actualRecords);
    }

    @Test
    public void testConsumeMinBytesIsOverridablePerConsumer() throws Exception {
        ConsumerRecord<ByteString, ByteString> binaryConsumerRecord = binaryConsumerRecord(0);
        int size = ((ByteString) binaryConsumerRecord.getKey()).size() + ((ByteString) binaryConsumerRecord.getValue()).size();
        Properties upProperties = setUpProperties(new Properties());
        upProperties.setProperty("fetch.min.bytes", Integer.toString(size * 5));
        upProperties.setProperty("consumer.request.max.bytes", Integer.toString(size * 6));
        setUpConsumer(upProperties);
        List<ConsumerRecord<ByteString, ByteString>> schedulePoll = schedulePoll();
        List asList = Arrays.asList(schedulePoll.get(0), schedulePoll.get(1), schedulePoll.get(2));
        schedulePoll(3);
        expectCreate(this.consumer);
        String createConsumer = this.consumerManager.createConsumer(groupName, ConsumerInstanceConfig.create((String) null, (String) null, EmbeddedFormat.BINARY, (String) null, (String) null, Integer.valueOf(size * 2), (Integer) null));
        this.consumerManager.subscribe(groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        this.consumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        readFromDefault(createConsumer);
        Thread.sleep((long) (Integer.parseInt("1000") * 0.5d));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", asList, actualRecords);
    }

    @Test
    public void testConsumerNormalOps() throws InterruptedException, ExecutionException {
        List<ConsumerRecord<ByteString, ByteString>> bootstrapConsumer = bootstrapConsumer(this.consumer);
        this.sawCallback = false;
        actualException = null;
        actualRecords = null;
        readFromDefault(this.consumer.cid());
        Thread.sleep((long) (Integer.parseInt("1000") * 1.1d));
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertNull("No exception in callback", actualException);
        Assert.assertEquals("Records returned not as expected", bootstrapConsumer, actualRecords);
        this.sawCallback = false;
        actualException = null;
        actualOffsets = null;
        this.consumerManager.commitOffsets(groupName, this.consumer.cid(), (String) null, (ConsumerOffsetCommitRequest) null, new KafkaConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.1
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                KafkaConsumerManagerTest.this.sawCallback = true;
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualOffsets = list;
            }
        }).get();
        Assert.assertTrue("Callback not called", this.sawCallback);
        Assert.assertNull("Callback exception", actualException);
        Assert.assertNotNull("Callback Offsets", actualOffsets);
        this.consumerManager.deleteConsumer(groupName, this.consumer.cid());
    }

    @Test
    public void testBackoffMsControlsPollCalls() throws Exception {
        Properties upProperties = setUpProperties();
        upProperties.put("consumer.request.timeout.ms", String.valueOf(5000L));
        upProperties.put("consumer.iterator.backoff.ms", String.valueOf(500L));
        setUpConsumer(upProperties);
        bootstrapConsumer(this.consumer);
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.consumer.schedulePollTask(new Runnable() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.2
            @Override // java.lang.Runnable
            public void run() {
                copyOnWriteArrayList.add(Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())));
                KafkaConsumerManagerTest.this.consumer.schedulePollTask(this);
            }
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.consumerManager.readRecords(groupName, this.consumer.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, (list, exc) -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertTrue(String.format("Expected at least 2 poll calls, but got %d instead.", Integer.valueOf(copyOnWriteArrayList.size())), copyOnWriteArrayList.size() >= 2);
        for (int i = 1; i < copyOnWriteArrayList.size() - 1; i++) {
            int i2 = 1;
            long longValue = ((Long) copyOnWriteArrayList.get(i)).longValue();
            for (int i3 = i + 1; i3 < copyOnWriteArrayList.size() && ((Long) copyOnWriteArrayList.get(i3)).longValue() - ((Long) copyOnWriteArrayList.get(i)).longValue() <= 500; i3++) {
                i2++;
                longValue = ((Long) copyOnWriteArrayList.get(i3)).longValue();
            }
            Assert.assertTrue(String.format("Expected at most 2 poll calls in window [%d, %d], but got %d instead.", copyOnWriteArrayList.get(i), Long.valueOf(longValue), Integer.valueOf(i2)), i2 <= 2);
            Assert.assertTrue(String.format("Expected at least 1 poll call in window (%d, %d), but got none instead.", copyOnWriteArrayList.get(i), copyOnWriteArrayList.get(i + 1)), ((Long) copyOnWriteArrayList.get(i + 1)).longValue() - ((Long) copyOnWriteArrayList.get(i)).longValue() <= 2 * 500);
        }
        long longValue2 = ((Long) copyOnWriteArrayList.get(copyOnWriteArrayList.size() - 1)).longValue();
        long longValue3 = ((Long) copyOnWriteArrayList.get(0)).longValue() + 5000;
        Assert.assertTrue(String.format("Expected at least 1 poll call in window (%d, %d], but got none instead.", Long.valueOf(longValue2), Long.valueOf(longValue3)), longValue3 - longValue2 < 2 * 500);
    }

    @Test
    public void testBackoffMsUpdatesReadTaskExpiry() throws Exception {
        Properties upProperties = setUpProperties();
        upProperties.put("consumer.iterator.backoff.ms", "1000");
        this.config = new KafkaRestConfig(upProperties, new SystemTime());
        this.consumerManager = new KafkaConsumerManager(this.config, this.consumerFactory);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST, groupName);
        bootstrapConsumer(this.consumer);
        this.consumerManager.readRecords(groupName, this.consumer.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, new ConsumerReadCallback<ByteString, ByteString>() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.3
            public void onCompletion(List<ConsumerRecord<ByteString, ByteString>> list, Exception exc) {
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualRecords = list;
                KafkaConsumerManagerTest.this.sawCallback = true;
            }
        });
        Thread.sleep(100L);
        KafkaConsumerManager.RunnableReadTask peek = this.consumerManager.delayedReadTasks.peek();
        if (peek == null) {
            Assert.fail("Could not get read task in time. It should not be null");
        }
        long delay = peek.getDelay(TimeUnit.MILLISECONDS);
        Assert.assertTrue(delay < 1000);
        Assert.assertTrue(delay > 700);
    }

    @Test
    public void testConsumerExpirationIsUpdated() throws Exception {
        bootstrapConsumer(this.consumer);
        KafkaConsumerState consumerInstance = this.consumerManager.getConsumerInstance(groupName, this.consumer.cid());
        long j = consumerInstance.expiration;
        this.consumerManager.readRecords(groupName, this.consumer.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, new ConsumerReadCallback<ByteString, ByteString>() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.4
            public void onCompletion(List<ConsumerRecord<ByteString, ByteString>> list, Exception exc) {
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualRecords = list;
                KafkaConsumerManagerTest.this.sawCallback = true;
            }
        });
        Thread.sleep(100L);
        Assert.assertFalse(consumerInstance.expired(j));
        long j2 = consumerInstance.expiration;
        awaitRead();
        Assert.assertTrue("Callback failed to fire", this.sawCallback);
        Assert.assertFalse(consumerInstance.expired(j2));
        long j3 = consumerInstance.expiration;
        this.consumerManager.commitOffsets(groupName, this.consumer.cid(), (String) null, (ConsumerOffsetCommitRequest) null, new KafkaConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.5
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                KafkaConsumerManagerTest.this.sawCallback = true;
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualOffsets = list;
            }
        }).get();
        Assert.assertFalse(consumerInstance.expired(j3));
    }

    private void awaitRead() throws InterruptedException {
        Thread.sleep((long) (Integer.parseInt("1000") * 1.1d));
    }

    @Test
    @Ignore
    public void testReadRecordsPopulatesDelayedReadTaskWhenExecutorFull() throws Exception {
        Properties upProperties = setUpProperties();
        upProperties.setProperty("consumer.iterator.backoff.ms", "1");
        this.config = new KafkaRestConfig(upProperties, new SystemTime());
        this.consumerManager = new KafkaConsumerManager(this.config, this.consumerFactory);
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST, "a");
        MockConsumer<byte[], byte[]> mockConsumer2 = new MockConsumer<>(OffsetResetStrategy.EARLIEST, "b");
        MockConsumer<byte[], byte[]> mockConsumer3 = new MockConsumer<>(OffsetResetStrategy.EARLIEST, "c");
        this.capturedConsumerConfig = Capture.newInstance();
        EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(this.capturedConsumerConfig))).andReturn(mockConsumer).andReturn(mockConsumer2).andReturn(mockConsumer3);
        EasyMock.replay(new Object[]{this.consumerFactory});
        bootstrapConsumer(mockConsumer, false);
        bootstrapConsumer(mockConsumer2, false);
        bootstrapConsumer(mockConsumer3, false);
        ConsumerReadCallback<ByteString, ByteString> consumerReadCallback = new ConsumerReadCallback<ByteString, ByteString>() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.6
            public void onCompletion(List<ConsumerRecord<ByteString, ByteString>> list, Exception exc) {
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualRecords = list;
                KafkaConsumerManagerTest.this.sawCallback = true;
            }
        };
        this.consumerManager.readRecords("a", mockConsumer.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, consumerReadCallback);
        this.consumerManager.readRecords("a", mockConsumer2.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, consumerReadCallback);
        Thread.sleep(10000L);
        Iterator it = this.consumerManager.delayedReadTasks.iterator();
        while (it.hasNext()) {
            long delay = ((KafkaConsumerManager.RunnableReadTask) it.next()).getDelay(TimeUnit.MILLISECONDS);
            Assert.assertTrue(delay > END_OFFSET);
            Assert.assertTrue(delay < 75);
        }
        Assert.assertEquals(1L, this.consumerManager.delayedReadTasks.size());
        this.consumerManager.readRecords("c", mockConsumer3.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, consumerReadCallback);
        Assert.assertEquals(2L, this.consumerManager.delayedReadTasks.size());
        Iterator it2 = this.consumerManager.delayedReadTasks.iterator();
        while (it2.hasNext()) {
            long delay2 = ((KafkaConsumerManager.RunnableReadTask) it2.next()).getDelay(TimeUnit.MILLISECONDS);
            Assert.assertTrue(delay2 > END_OFFSET);
            Assert.assertTrue(delay2 < 75);
        }
    }

    private List<ConsumerRecord<ByteString, ByteString>> bootstrapConsumer(MockConsumer<byte[], byte[]> mockConsumer) {
        return bootstrapConsumer(mockConsumer, true);
    }

    private List<ConsumerRecord<ByteString, ByteString>> bootstrapConsumer(MockConsumer<byte[], byte[]> mockConsumer, boolean z) {
        List<ConsumerRecord<ByteString, ByteString>> asList = Arrays.asList(ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k1"), ByteString.copyFromUtf8("v1"), 0, 0L), ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k2"), ByteString.copyFromUtf8("v2"), 0, 1L), ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k3"), ByteString.copyFromUtf8("v3"), 0, 2L));
        if (z) {
            expectCreate(mockConsumer);
        }
        String createConsumer = this.consumerManager.createConsumer(mockConsumer.groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        mockConsumer.cid(createConsumer);
        this.consumerManager.subscribe(mockConsumer.groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        mockConsumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        for (ConsumerRecord<ByteString, ByteString> consumerRecord : asList) {
            mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(consumerRecord.getTopic(), consumerRecord.getPartition(), consumerRecord.getOffset(), ((ByteString) consumerRecord.getKey()).toByteArray(), ((ByteString) consumerRecord.getValue()).toByteArray()));
        }
        return asList;
    }

    private void readFromDefault(String str) throws InterruptedException, ExecutionException {
        this.consumerManager.readRecords(groupName, str, BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, new ConsumerReadCallback<ByteString, ByteString>() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.7
            public void onCompletion(List<ConsumerRecord<ByteString, ByteString>> list, Exception exc) {
                Exception unused = KafkaConsumerManagerTest.actualException = exc;
                List unused2 = KafkaConsumerManagerTest.actualRecords = list;
                KafkaConsumerManagerTest.this.sawCallback = true;
            }
        });
    }

    private List<ConsumerRecord<ByteString, ByteString>> schedulePoll() {
        return schedulePoll(0);
    }

    private List<ConsumerRecord<ByteString, ByteString>> schedulePoll(final int i) {
        this.consumer.schedulePollTask(new Runnable() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManagerTest.8
            @Override // java.lang.Runnable
            public void run() {
                KafkaConsumerManagerTest.this.consumer.addRecord(KafkaConsumerManagerTest.this.record(i));
                KafkaConsumerManagerTest.this.consumer.addRecord(KafkaConsumerManagerTest.this.record(i + 1));
                KafkaConsumerManagerTest.this.consumer.addRecord(KafkaConsumerManagerTest.this.record(i + 2));
            }
        });
        return Arrays.asList(binaryConsumerRecord(i), binaryConsumerRecord(i + 1), binaryConsumerRecord(i + 2));
    }

    private ConsumerRecord<ByteString, ByteString> binaryConsumerRecord(int i) {
        return ConsumerRecord.create(topicName, ByteString.copyFromUtf8(String.format("k%d", Integer.valueOf(i))), ByteString.copyFromUtf8(String.format("v%d", Integer.valueOf(i))), 0, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> record(int i) {
        return new org.apache.kafka.clients.consumer.ConsumerRecord<>(topicName, 0, i, String.format("k%d", Integer.valueOf(i)).getBytes(), String.format("v%d", Integer.valueOf(i)).getBytes());
    }
}
