package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/BufferPoolTest.class */
public class BufferPoolTest {
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics(this.time);
    private final long maxBlockTimeMs = 2000;
    private final String metricGroup = "TestMetrics";

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/BufferPoolTest$BufferPoolAllocator.class */
    private static class BufferPoolAllocator implements Runnable {
        BufferPool pool;
        long maxBlockTimeMs;

        BufferPoolAllocator(BufferPool bufferPool, long j) {
            this.pool = bufferPool;
            this.maxBlockTimeMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.pool.allocate(2, this.maxBlockTimeMs);
                Assert.fail("The buffer allocated more memory than its maximum value 2");
            } catch (InterruptedException e) {
            } catch (TimeoutException e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/BufferPoolTest$StressTestThread.class */
    public static class StressTestThread extends Thread {
        private final int iterations;
        private final BufferPool pool;
        private final long maxBlockTimeMs = 20000;
        public final AtomicBoolean success = new AtomicBoolean(false);

        public StressTestThread(BufferPool bufferPool, int i) {
            this.iterations = i;
            this.pool = bufferPool;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.iterations; i++) {
                try {
                    this.pool.deallocate(this.pool.allocate(TestUtils.RANDOM.nextBoolean() ? this.pool.poolableSize() : TestUtils.RANDOM.nextInt((int) this.pool.totalMemory()), 20000L));
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
            this.success.set(true);
        }
    }

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        BufferPool bufferPool = new BufferPool(65536L, 1024, this.metrics, this.time, "TestMetrics");
        ByteBuffer allocate = bufferPool.allocate(1024, 2000L);
        Assert.assertEquals("Buffer size should equal requested size.", 1024, allocate.limit());
        Assert.assertEquals("Unallocated memory should have shrunk", 65536 - 1024, bufferPool.unallocatedMemory());
        Assert.assertEquals("Available memory should have shrunk", 65536 - 1024, bufferPool.availableMemory());
        allocate.putInt(1);
        allocate.flip();
        bufferPool.deallocate(allocate);
        Assert.assertEquals("All memory should be available", 65536L, bufferPool.availableMemory());
        Assert.assertEquals("But now some is on the free list", 65536 - 1024, bufferPool.unallocatedMemory());
        ByteBuffer allocate2 = bufferPool.allocate(1024, 2000L);
        Assert.assertEquals("Recycled buffer should be cleared.", 0L, allocate2.position());
        Assert.assertEquals("Recycled buffer should be cleared.", allocate2.capacity(), allocate2.limit());
        bufferPool.deallocate(allocate2);
        Assert.assertEquals("All memory should be available", 65536L, bufferPool.availableMemory());
        Assert.assertEquals("Still a single buffer on the free list", 65536 - 1024, bufferPool.unallocatedMemory());
        bufferPool.deallocate(bufferPool.allocate(2 * 1024, 2000L));
        Assert.assertEquals("All memory should be available", 65536L, bufferPool.availableMemory());
        Assert.assertEquals("Non-standard size didn't go to the free list.", 65536 - 1024, bufferPool.unallocatedMemory());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
        BufferPool bufferPool = new BufferPool(1024L, 512, this.metrics, this.time, "TestMetrics");
        ByteBuffer allocate = bufferPool.allocate(1024, 2000L);
        Assert.assertEquals(1024L, allocate.limit());
        bufferPool.deallocate(allocate);
        bufferPool.allocate(1025, 2000L);
    }

    @Test
    public void testDelayedAllocation() throws Exception {
        BufferPool bufferPool = new BufferPool(5120L, 1024, this.metrics, this.time, "TestMetrics");
        CountDownLatch asyncDeallocate = asyncDeallocate(bufferPool, bufferPool.allocate(1024, 2000L));
        CountDownLatch asyncAllocate = asyncAllocate(bufferPool, 5120);
        Assert.assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, asyncAllocate.getCount());
        asyncDeallocate.countDown();
        Assert.assertTrue("Allocation should succeed soon after de-allocation", asyncAllocate.await(1L, TimeUnit.SECONDS));
    }

    private CountDownLatch asyncDeallocate(final BufferPool bufferPool, final ByteBuffer byteBuffer) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread() { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                bufferPool.deallocate(byteBuffer);
            }
        }.start();
        return countDownLatch;
    }

    private void delayedDeallocate(final BufferPool bufferPool, final ByteBuffer byteBuffer, final long j) {
        new Thread() { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Time.SYSTEM.sleep(j);
                bufferPool.deallocate(byteBuffer);
            }
        }.start();
    }

    private CountDownLatch asyncAllocate(final BufferPool bufferPool, final int i) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread() { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    bufferPool.allocate(i, 2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }
        }.start();
        return countDownLatch;
    }

    @Test
    public void testBlockTimeout() throws Exception {
        BufferPool bufferPool = new BufferPool(10L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer allocate = bufferPool.allocate(1, 2000L);
        ByteBuffer allocate2 = bufferPool.allocate(1, 2000L);
        ByteBuffer allocate3 = bufferPool.allocate(1, 2000L);
        delayedDeallocate(bufferPool, allocate, 1000L);
        delayedDeallocate(bufferPool, allocate2, 2000L);
        delayedDeallocate(bufferPool, allocate3, 5000L);
        long milliseconds = Time.SYSTEM.milliseconds();
        try {
            bufferPool.allocate(10, 2000L);
            Assert.fail("The buffer allocated more memory than its maximum value 10");
        } catch (TimeoutException e) {
        }
        Assert.assertTrue("available memory " + bufferPool.availableMemory(), bufferPool.availableMemory() >= 8 && bufferPool.availableMemory() <= 10);
        long milliseconds2 = Time.SYSTEM.milliseconds() - milliseconds;
        Assert.assertTrue("TimeoutException should not throw before maxBlockTimeMs", milliseconds2 >= 2000);
        Assert.assertTrue("TimeoutException should throw soon after maxBlockTimeMs", milliseconds2 < 3000);
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception {
        BufferPool bufferPool = new BufferPool(2L, 1, this.metrics, this.time, "TestMetrics");
        bufferPool.allocate(1, 2000L);
        try {
            bufferPool.allocate(2, 2000L);
            Assert.fail("The buffer allocated more memory than its maximum value 2");
        } catch (TimeoutException e) {
        }
        Assert.assertEquals(0L, bufferPool.queued());
        Assert.assertEquals(1L, bufferPool.availableMemory());
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
        BufferPool bufferPool = new BufferPool(2L, 1, this.metrics, this.time, "TestMetrics");
        bufferPool.allocate(1, 2000L);
        Thread thread = new Thread(new BufferPoolAllocator(bufferPool, 5000L));
        Thread thread2 = new Thread(new BufferPoolAllocator(bufferPool, 5000L));
        thread.start();
        Thread.sleep(500L);
        Deque waiters = bufferPool.waiters();
        Condition condition = (Condition) waiters.getFirst();
        thread2.start();
        Thread.sleep(500L);
        thread.interrupt();
        Thread.sleep(500L);
        Condition condition2 = (Condition) waiters.getLast();
        thread2.interrupt();
        Assert.assertNotEquals(condition, condition2);
        thread.join();
        thread2.join();
        Assert.assertEquals(bufferPool.queued(), 0L);
    }

    @Test
    public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
        BufferPool bufferPool = (BufferPool) Mockito.spy(new BufferPool(2L, 1, new Metrics(), this.time, "TestMetrics"));
        ((BufferPool) Mockito.doThrow(new Throwable[]{new OutOfMemoryError()}).when(bufferPool)).recordWaitTime(ArgumentMatchers.anyLong());
        bufferPool.allocate(1, 0L);
        try {
            bufferPool.allocate(2, 1000L);
            Assert.fail("Expected oom.");
        } catch (OutOfMemoryError e) {
        }
        Assert.assertEquals(1L, bufferPool.availableMemory());
        Assert.assertEquals(0L, bufferPool.queued());
        Assert.assertEquals(1L, bufferPool.unallocatedMemory());
        bufferPool.allocate(1, 0L);
        ((BufferPool) Mockito.verify(bufferPool)).recordWaitTime(ArgumentMatchers.anyLong());
    }

    @Test
    public void testStressfulSituation() throws Exception {
        long j = (10 / 2) * 1024;
        BufferPool bufferPool = new BufferPool(j, 1024, this.metrics, this.time, "TestMetrics");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new StressTestThread(bufferPool, 50000));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((StressTestThread) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((StressTestThread) it2.next()).join();
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            Assert.assertTrue("Thread should have completed all iterations successfully.", ((StressTestThread) it3.next()).success.get());
        }
        Assert.assertEquals(j, bufferPool.availableMemory());
    }

    @Test
    public void testLargeAvailableMemory() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        BufferPool bufferPool = new BufferPool(20000000000L, 2000000000, this.metrics, this.time, "TestMetrics") { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.4
            protected ByteBuffer allocateByteBuffer(int i) {
                return ByteBuffer.allocate(0);
            }

            protected int freeSize() {
                return atomicInteger.get();
            }
        };
        bufferPool.allocate(2000000000, 0L);
        Assert.assertEquals(18000000000L, bufferPool.availableMemory());
        bufferPool.allocate(2000000000, 0L);
        Assert.assertEquals(16000000000L, bufferPool.availableMemory());
        atomicInteger.incrementAndGet();
        Assert.assertEquals(18000000000L, bufferPool.availableMemory());
        atomicInteger.incrementAndGet();
        Assert.assertEquals(20000000000L, bufferPool.availableMemory());
    }

    @Test
    public void outOfMemoryOnAllocation() {
        BufferPool bufferPool = new BufferPool(1024L, 1024, this.metrics, this.time, "TestMetrics") { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.5
            protected ByteBuffer allocateByteBuffer(int i) {
                throw new OutOfMemoryError();
            }
        };
        try {
            bufferPool.allocateByteBuffer(1024);
            Assert.fail("Should have thrown OutOfMemoryError");
        } catch (OutOfMemoryError e) {
        }
        Assert.assertEquals(bufferPool.availableMemory(), 1024L);
    }

    @Test
    public void testCloseAllocations() throws Exception {
        BufferPool bufferPool = new BufferPool(10L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer allocate = bufferPool.allocate(1, 2000L);
        bufferPool.close();
        Assert.assertThrows(KafkaException.class, () -> {
            bufferPool.allocate(1, 2000L);
        });
        bufferPool.deallocate(allocate);
    }

    @Test
    public void testCloseNotifyWaiters() throws Exception {
        final BufferPool bufferPool = new BufferPool(1L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer allocate = bufferPool.allocate(1, Long.MAX_VALUE);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        Callable<Void> callable = new Callable<Void>() { // from class: org.apache.kafka.clients.producer.internals.BufferPoolTest.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                BufferPool bufferPool2 = bufferPool;
                Assert.assertThrows(KafkaException.class, () -> {
                    bufferPool2.allocate(1, Long.MAX_VALUE);
                });
                countDownLatch.countDown();
                return null;
            }
        };
        for (int i = 0; i < 2; i++) {
            newFixedThreadPool.submit(callable);
        }
        Assert.assertEquals("Allocation shouldn't have happened yet, waiting on memory", 2L, countDownLatch.getCount());
        bufferPool.close();
        countDownLatch.await(15L, TimeUnit.SECONDS);
        bufferPool.deallocate(allocate);
    }
}
