package kafka.tier.fetcher;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.tier.fetcher.MemoryTracker;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:kafka/tier/fetcher/MemoryTrackerTest.class */
public class MemoryTrackerTest {
    @Test
    public void testLeaseReclaim() {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(newContext, 1024L);
        Assert.assertEquals(memoryTracker.leased(), newLease.leased());
        newLease.release();
        Assert.assertEquals(memoryTracker.leased(), 0L);
    }

    @Test
    public void testTryLease() {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(newContext, 1024L);
        Assert.assertFalse(memoryTracker.tryLease(1024L).isPresent());
        newLease.release();
        Assert.assertTrue(memoryTracker.tryLease(1024L).isPresent());
    }

    @Test
    public void testTryLeaseBurst() {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        memoryTracker.newLease(newContext, 5120L);
        Assert.assertFalse(memoryTracker.tryLease(1L).isPresent());
    }

    @Test
    public void testLeaseExtend() {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(newContext, 512L);
        Assert.assertTrue(newLease.tryExtendLease(512L));
        Assert.assertFalse(newLease.tryExtendLease(512L));
        newLease.release();
        Assert.assertEquals(memoryTracker.leased(), 0L);
    }

    @Test
    public void testCancelledNewLeaseClaimsNothing() throws InterruptedException {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        memoryTracker.newLease(CancellationContext.newContext(), 1024L);
        Thread thread = new Thread(() -> {
            memoryTracker.newLease(newContext, 1024L);
        });
        newContext.cancel();
        memoryTracker.wakeup();
        thread.join();
        Assert.assertEquals("expected no additional memory to be taken from the pool", memoryTracker.leased(), 1024L);
    }

    @Test
    public void testReclaimedLeaseUnblocksWaiter() throws InterruptedException {
        MockTime mockTime = new MockTime();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(CancellationContext.newContext(), 1024L);
        Thread thread = new Thread(() -> {
            memoryTracker.newLease(newContext, 1024L);
        });
        newLease.release();
        thread.join(1000L);
    }

    private boolean futureDone(Future<?> future, long j, TimeUnit timeUnit) {
        try {
            future.get(j, timeUnit);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Test
    public void testPoolSizeZeroIsUnrestricted() {
        MockTime mockTime = new MockTime();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 0L);
        Assert.assertTrue(memoryTracker.isDisabled());
        Optional tryLease = memoryTracker.tryLease(1024L);
        Assert.assertTrue(tryLease.isPresent());
        Optional tryLease2 = memoryTracker.tryLease(1024L);
        Assert.assertTrue(tryLease2.isPresent());
        Assert.assertTrue("expected MemoryTracker::newLease not to block", futureDone(newSingleThreadExecutor.submit(() -> {
            memoryTracker.newLease(newContext, 1024L).release();
        }), 5L, TimeUnit.SECONDS));
        Assert.assertEquals(2048L, memoryTracker.leased());
        tryLease.ifPresent((v0) -> {
            v0.release();
        });
        Assert.assertEquals(1024L, memoryTracker.leased());
        tryLease2.ifPresent((v0) -> {
            v0.release();
        });
        Assert.assertEquals(0L, memoryTracker.leased());
    }

    @Test
    public void testChangingPoolSizeDynamicallyWakesBlockedRequests() {
        MockTime mockTime = new MockTime();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, 1024L);
        memoryTracker.newLease(newContext, 1024L);
        Future<?> submit = newSingleThreadExecutor.submit(() -> {
            memoryTracker.newLease(newContext, 1024L).release();
        });
        Assert.assertFalse("memory acquisition should be blocked", futureDone(submit, 5L, TimeUnit.SECONDS));
        memoryTracker.setPoolSize(0L);
        Assert.assertTrue("expected setting the pool size to 0 would unblock memory acquisition", futureDone(submit, 5L, TimeUnit.SECONDS));
    }

    @Test
    public void testChangingPoolSizeDynamically() {
        MemoryTracker memoryTracker = new MemoryTracker(new MockTime(), 1024L);
        Assert.assertTrue(memoryTracker.tryLease(1024L).isPresent());
        Assert.assertFalse(memoryTracker.tryLease(1024L).isPresent());
        memoryTracker.setPoolSize(2048L);
        Assert.assertTrue(memoryTracker.tryLease(1024L).isPresent());
        Assert.assertFalse(memoryTracker.tryLease(1024L).isPresent());
    }

    @Test
    public void testOomTimeSensor() throws InterruptedException, ExecutionException, TimeoutException {
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics(mockTime);
        MemoryTracker memoryTracker = new MemoryTracker(mockTime, metrics, 1024L);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        KafkaMetric metric = metrics.metric(memoryTracker.memoryTrackerDepletedTimeMetricName);
        KafkaMetric metric2 = metrics.metric(memoryTracker.memoryTrackerDepletedPercentMetricName);
        CancellationContext newContext = CancellationContext.newContext();
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(newContext, 1024L);
        Future<?> submit = newSingleThreadExecutor.submit(() -> {
            memoryTracker.newLease(newContext, 1024L).release();
        });
        Thread.sleep(500L);
        mockTime.sleep(10000L);
        newLease.release();
        submit.get();
        Assert.assertEquals("expected 10 seconds of blocked time", ((Double) metric.metricValue()).doubleValue(), 10000.0d, 0.0d);
        Assert.assertTrue("expected a nonzero amount of blocked time", ((Double) metric2.metricValue()).doubleValue() > 0.0d);
    }
}
