package io.confluent.kafkarest.unit;

import io.confluent.kafkarest.SimpleConsumerFactory;
import io.confluent.kafkarest.SimpleConsumerPool;
import io.confluent.kafkarest.SimpleFetcher;
import io.confluent.kafkarest.SystemTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.javaapi.consumer.SimpleConsumer;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/kafkarest/unit/SimpleConsumerPoolTest.class */
public class SimpleConsumerPoolTest {
    private final int AWAIT_TERMINATION_TIMEOUT = 2000;
    private final int POOL_CALLER_SLEEP_TIME = 50;
    private SimpleConsumerFactory simpleConsumerFactory = (SimpleConsumerFactory) EasyMock.createMock(SimpleConsumerFactory.class);

    /* loaded from: input_file:io/confluent/kafkarest/unit/SimpleConsumerPoolTest$PoolCaller.class */
    private class PoolCaller implements Runnable {
        private SimpleConsumerPool pool;

        public PoolCaller(SimpleConsumerPool simpleConsumerPool) {
            this.pool = simpleConsumerPool;
        }

        @Override // java.lang.Runnable
        public void run() {
            SimpleFetcher simpleFetcher = this.pool.get("", 0);
            try {
                Thread.sleep(50L);
                simpleFetcher.close();
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        EasyMock.reset(new Object[]{this.simpleConsumerFactory});
        EasyMock.expect(this.simpleConsumerFactory.createConsumer("", 0)).andStubAnswer(new IAnswer<SimpleConsumer>() { // from class: io.confluent.kafkarest.unit.SimpleConsumerPoolTest.1
            private AtomicInteger clientIdCounter = new AtomicInteger(0);

            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public SimpleConsumer m16answer() throws Throwable {
                SimpleConsumer simpleConsumer = (SimpleConsumer) EasyMock.createMockBuilder(SimpleConsumer.class).addMockedMethod("clientId").createMock();
                EasyMock.expect(simpleConsumer.clientId()).andReturn("clientid-" + this.clientIdCounter.getAndIncrement()).anyTimes();
                EasyMock.replay(new Object[]{simpleConsumer});
                return simpleConsumer;
            }
        });
        EasyMock.replay(new Object[]{this.simpleConsumerFactory});
    }

    @Test
    public void testPoolWhenOneSingleThreadedCaller() throws Exception {
        SimpleConsumerPool simpleConsumerPool = new SimpleConsumerPool(3, 1000, new SystemTime(), this.simpleConsumerFactory);
        for (int i = 0; i < 10; i++) {
            simpleConsumerPool.get("", 0).close();
        }
        Assert.assertTrue(simpleConsumerPool.size() == 1);
    }

    @Test
    public void testPoolWhenMultiThreadedCaller() throws Exception {
        SimpleConsumerPool simpleConsumerPool = new SimpleConsumerPool(3, 1000, new SystemTime(), this.simpleConsumerFactory);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.execute(new PoolCaller(simpleConsumerPool));
        }
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(2000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(3L, simpleConsumerPool.size());
    }

    @Test
    public void testUnlimitedPoolWhenMultiThreadedCaller() throws Exception {
        SimpleConsumerPool simpleConsumerPool = new SimpleConsumerPool(0, 1000, new SystemTime(), this.simpleConsumerFactory);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.execute(new PoolCaller(simpleConsumerPool));
        }
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(2000L, TimeUnit.MILLISECONDS));
        Assert.assertTrue(simpleConsumerPool.size() > 0);
    }

    @Test
    public void testPoolTimeoutError() throws Exception {
        SimpleConsumerPool simpleConsumerPool = new SimpleConsumerPool(1, 1, new SystemTime(), this.simpleConsumerFactory);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(newFixedThreadPool.submit(new PoolCaller(simpleConsumerPool)));
        }
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(2000L, TimeUnit.MILLISECONDS));
        boolean z = false;
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (ExecutionException e) {
            if (e.getCause().getErrorCode() == 50301) {
                z = true;
            }
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testPoolNoTimeout() throws Exception {
        SimpleConsumerPool simpleConsumerPool = new SimpleConsumerPool(1, 0, new SystemTime(), this.simpleConsumerFactory);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.submit(new PoolCaller(simpleConsumerPool));
        }
        newFixedThreadPool.shutdown();
        Assert.assertFalse(newFixedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
    }
}
