package io.confluent.kafkarest.v2;

import com.google.protobuf.ByteString;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.SystemTime;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.IExpectationSetters;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:io/confluent/kafkarest/v2/LoadTest.class */
public class LoadTest {
    private KafkaRestConfig config;

    @Mock
    private KafkaConsumerManager.KafkaConsumerFactory consumerFactory;
    private KafkaConsumerManager consumerManager;
    private static final String topicName = "testtopic";
    private Capture<Properties> capturedConsumerConfig;
    private long requestTimeoutMs = 1000;
    private Random random = new Random();

    /* loaded from: input_file:io/confluent/kafkarest/v2/LoadTest$ConsumerTestRun.class */
    class ConsumerTestRun {
        private final MockConsumer consumer;
        private final Time time;
        private ReentrantLock lock;
        private Condition cond;
        private volatile boolean sawCallback;
        private volatile List<ConsumerRecord<byte[], byte[]>> actualRecords;
        private volatile Exception actualException;
        private ConsumerReadCallback callback;
        private int latestOffset;
        private long readStartMs;

        ConsumerTestRun(LoadTest loadTest, MockConsumer mockConsumer) {
            this(mockConsumer, new SystemTime());
        }

        ConsumerTestRun(MockConsumer mockConsumer, Time time) {
            this.lock = new ReentrantLock();
            this.cond = this.lock.newCondition();
            this.sawCallback = false;
            this.actualRecords = null;
            this.latestOffset = 0;
            this.consumer = mockConsumer;
            this.time = time;
            this.readStartMs = 2147483647L;
            this.sawCallback = false;
            this.callback = new ConsumerReadCallback<byte[], byte[]>() { // from class: io.confluent.kafkarest.v2.LoadTest.ConsumerTestRun.1
                public void onCompletion(List<ConsumerRecord<byte[], byte[]>> list, Exception exc) {
                    ConsumerTestRun.this.lock.lock();
                    try {
                        ConsumerTestRun.this.sawCallback = true;
                        ConsumerTestRun.this.actualRecords = list;
                        ConsumerTestRun.this.actualException = exc;
                        ConsumerTestRun.this.cond.signalAll();
                    } finally {
                        ConsumerTestRun.this.lock.unlock();
                    }
                }
            };
        }

        void bootstrap() {
            LoadTest.this.bootstrapConsumer(this.consumer);
        }

        void read() {
            Assert.assertNull(this.actualException);
            Assert.assertNull(this.actualRecords);
            Assert.assertFalse(this.sawCallback);
            schedulePoll();
            LoadTest.this.consumerManager.readRecords(this.consumer.groupName, this.consumer.cid(), BinaryKafkaConsumerState.class, -1L, Long.MAX_VALUE, this.callback);
            this.readStartMs = this.time.milliseconds();
        }

        void awaitRead() throws InterruptedException {
            this.lock.lock();
            while (!this.sawCallback) {
                try {
                    this.cond.await();
                } finally {
                    this.lock.unlock();
                }
            }
        }

        void verifyRead() {
            Assert.assertTrue("Callback failed to fire", this.sawCallback);
            Assert.assertNull("There shouldn't be an exception in callback", this.actualException);
            Assert.assertEquals("Records returned not as expected", referenceRecords(), this.actualRecords);
            this.lock.lock();
            try {
                this.sawCallback = false;
                this.actualRecords = null;
                this.actualException = null;
            } finally {
                this.lock.unlock();
            }
        }

        private List<ConsumerRecord<ByteString, ByteString>> referenceRecords() {
            return Arrays.asList(ConsumerRecord.create(LoadTest.topicName, ByteString.copyFromUtf8("k1"), ByteString.copyFromUtf8("v1"), 0, this.latestOffset - 3), ConsumerRecord.create(LoadTest.topicName, ByteString.copyFromUtf8("k2"), ByteString.copyFromUtf8("v2"), 0, this.latestOffset - 2), ConsumerRecord.create(LoadTest.topicName, ByteString.copyFromUtf8("k3"), ByteString.copyFromUtf8("v3"), 0, this.latestOffset - 1));
        }

        private void schedulePoll() {
            this.consumer.schedulePollTask(new Runnable() { // from class: io.confluent.kafkarest.v2.LoadTest.ConsumerTestRun.2
                @Override // java.lang.Runnable
                public void run() {
                    ConsumerTestRun.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(LoadTest.topicName, 0, ConsumerTestRun.this.latestOffset, "k1".getBytes(), "v1".getBytes()));
                    ConsumerTestRun.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(LoadTest.topicName, 0, ConsumerTestRun.this.latestOffset + 1, "k2".getBytes(), "v2".getBytes()));
                    ConsumerTestRun.this.consumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord(LoadTest.topicName, 0, ConsumerTestRun.this.latestOffset + 2, "k3".getBytes(), "v3".getBytes()));
                    ConsumerTestRun.this.latestOffset += 3;
                }
            });
        }
    }

    @Test
    public void testMultipleConsumerMultipleGroups() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "PLAINTEXT://hostname:9092");
        properties.setProperty("consumer.threads", "-1");
        properties.setProperty("consumer.request.timeout.ms", Long.toString(this.requestTimeoutMs));
        this.config = new KafkaRestConfig(properties, new SystemTime());
        this.consumerManager = new KafkaConsumerManager(this.config, this.consumerFactory);
        ArrayList<ConsumerTestRun> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            for (int i2 = 0; i2 < 10; i2++) {
                arrayList.add(new ConsumerTestRun(this, new MockConsumer(OffsetResetStrategy.EARLIEST, Integer.toString(i))));
            }
        }
        this.capturedConsumerConfig = Capture.newInstance();
        IExpectationSetters expect = EasyMock.expect(this.consumerFactory.createConsumer((Properties) EasyMock.capture(this.capturedConsumerConfig)));
        Method method = expect.getClass().getMethod("andReturn", Object.class);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            method.invoke(expect, ((ConsumerTestRun) it.next()).consumer);
        }
        EasyMock.replay(new Object[]{this.consumerFactory});
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((ConsumerTestRun) it2.next()).bootstrap();
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            ((ConsumerTestRun) it3.next()).read();
        }
        for (int i3 = 0; i3 < 30; i3++) {
            for (ConsumerTestRun consumerTestRun : arrayList) {
                consumerTestRun.awaitRead();
                consumerTestRun.verifyRead();
                Thread.sleep(this.random.nextInt(5));
                consumerTestRun.read();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void bootstrapConsumer(MockConsumer<byte[], byte[]> mockConsumer) {
        String createConsumer = this.consumerManager.createConsumer(mockConsumer.groupName, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY));
        mockConsumer.cid(createConsumer);
        this.consumerManager.subscribe(mockConsumer.groupName, createConsumer, new ConsumerSubscriptionRecord(Collections.singletonList(topicName), (String) null));
        mockConsumer.rebalance(Collections.singletonList(new TopicPartition(topicName, 0)));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
    }
}
