package kafka.server.link;

import kafka.server.FetcherPool;
import kafka.server.FetcherPool$;
import kafka.server.FetcherPool$Default$;
import kafka.server.FetcherPool$InSync$;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.StringOps$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ClusterLinkFetchResponseAllocatorTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00055e\u0001\u0002\u000b\u0016\u0001qAQa\t\u0001\u0005\u0002\u0011Bqa\n\u0001C\u0002\u0013%\u0001\u0006\u0003\u0004.\u0001\u0001\u0006I!\u000b\u0005\b]\u0001\u0011\r\u0011\"\u00030\u0011\u00191\u0005\u0001)A\u0005a!9q\t\u0001a\u0001\n\u0013A\u0005b\u0002.\u0001\u0001\u0004%Ia\u0017\u0005\u0007C\u0002\u0001\u000b\u0015B%\t\u000b\t\u0004A\u0011A2\t\u000b=\u0004A\u0011\u00019\t\u000f\u0005=\u0001\u0001\"\u0001\u0002\u0012!9\u00111\u0004\u0001\u0005\u0002\u0005u\u0001BBA\u0014\u0001\u0011\u00051\r\u0003\u0004\u00022\u0001!\ta\u0019\u0005\u0007\u0003k\u0001A\u0011A2\t\r\u0005e\u0002\u0001\"\u0001d\u0011\u001d\ti\u0004\u0001C\u0005\u0003\u007fAq!!\u001b\u0001\t\u0013\tY\u0007C\u0004\u0002X\u0001!I!!\"\u0003K\rcWo\u001d;fe2Kgn\u001b$fi\u000eD'+Z:q_:\u001cX-\u00117m_\u000e\fGo\u001c:UKN$(B\u0001\f\u0018\u0003\u0011a\u0017N\\6\u000b\u0005aI\u0012AB:feZ,'OC\u0001\u001b\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u000f\u0011\u0005y\tS\"A\u0010\u000b\u0003\u0001\nQa]2bY\u0006L!AI\u0010\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\tQ\u0005\u0005\u0002'\u00015\tQ#\u0001\u0007ce>\\WM]\"p]\u001aLw-F\u0001*!\tQ3&D\u0001\u0018\u0013\tasCA\u0006LC\u001a\\\u0017mQ8oM&<\u0017!\u00042s_.,'oQ8oM&<\u0007%\u0001\ndkJ\u0014XM\u001c;Ce>\\WM\u001d)s_B\u001cX#\u0001\u0019\u0011\tE2\u0004hQ\u0007\u0002e)\u00111\u0007N\u0001\b[V$\u0018M\u00197f\u0015\t)t$\u0001\u0006d_2dWm\u0019;j_:L!a\u000e\u001a\u0003\u00075\u000b\u0007\u000f\u0005\u0002:\u0001:\u0011!H\u0010\t\u0003w}i\u0011\u0001\u0010\u0006\u0003{m\ta\u0001\u0010:p_Rt\u0014BA  \u0003\u0019\u0001&/\u001a3fM&\u0011\u0011I\u0011\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005}z\u0002C\u0001\u0010E\u0013\t)uDA\u0002B]f\f1cY;se\u0016tGO\u0011:pW\u0016\u0014\bK]8qg\u0002\na\u0002\u001e5sK\u0006$\u0017I\u001a4j]&$\u00180F\u0001J!\tQ\u0005,D\u0001L\u0015\taU*\u0001\tD_:4G.^3oi\u000e{gNZ5hg*\u0011ajT\u0001\nS:$XM\u001d8bYNT!\u0001U)\u0002\r\r|gNZ5h\u0015\t\u00116+\u0001\u0004d_6lwN\u001c\u0006\u00035QS!!\u0016,\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0016aA8sO&\u0011\u0011l\u0013\u0002\u001a\u00072,8\u000f^3s\u0019&t7\u000e\u00165sK\u0006$\u0017I\u001a4j]&$\u00180\u0001\nuQJ,\u0017\rZ!gM&t\u0017\u000e^=`I\u0015\fHC\u0001/`!\tqR,\u0003\u0002_?\t!QK\\5u\u0011\u001d\u0001w!!AA\u0002%\u000b1\u0001\u001f\u00132\u0003=!\bN]3bI\u00063g-\u001b8jif\u0004\u0013!B:fiV\u0003H#\u0001/)\u0005%)\u0007C\u00014n\u001b\u00059'B\u00015j\u0003\r\t\u0007/\u001b\u0006\u0003U.\fqA[;qSR,'O\u0003\u0002m-\u0006)!.\u001e8ji&\u0011an\u001a\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017A\u0007;fgR4U\r^2i%\u0016\u001c\bo\u001c8tK\u0006cGn\\2bi>\u0014HC\u0001/r\u0011\u0015\u0011(\u00021\u00019\u0003=1W\r^2iKJ\u0004vn\u001c7OC6,\u0007F\u0001\u0006u!\t)\b0D\u0001w\u0015\t9\u0018.\u0001\u0004qCJ\fWn]\u0005\u0003sZ\u0014\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:uQ\u0019Q10a\u0001\u0002\u0006A\u0011Ap`\u0007\u0002{*\u0011aP^\u0001\taJ|g/\u001b3fe&\u0019\u0011\u0011A?\u0003\u0017Y\u000bG.^3T_V\u00148-Z\u0001\bgR\u0014\u0018N\\4tY\u0011\t9!a\u0003\"\u0005\u0005%\u0011a\u0002#fM\u0006,H\u000e^\u0011\u0003\u0003\u001b\ta!\u00138Ts:\u001c\u0017!\n;fgRlU\u000f\u001c;j)\u0016t\u0017M\u001c;GKR\u001c\u0007NU3ta>t7/Z!mY>\u001c\u0017\r^8s)\ra\u00161\u0003\u0005\u0006e.\u0001\r\u0001\u000f\u0015\u0003\u0017QDcaC>\u0002\u0004\u0005eA\u0006BA\u0004\u0003\u0017\t\u0001\u0006^3ti\u001a+Go\u00195SKN\u0004xN\\:f\u00032dwnY1u_J<\u0016\u000e\u001e5pkR$VM\\1oiN$2\u0001XA\u0010\u0011\u0015\u0011H\u00021\u00019Q\taA\u000f\u000b\u0004\rw\u0006\r\u0011Q\u0005\u0017\u0005\u0003\u000f\tY!\u0001\nuKN$h)\u001a;dQ\u0016\u00148\t[1oO\u0016\u001c\bfA\u0007\u0002,A\u0019a-!\f\n\u0007\u0005=rM\u0001\u0003UKN$\u0018!\b;fgRlU\u000f\u001c;j)\u0016t\u0017M\u001c;GKR\u001c\u0007.\u001a:DQ\u0006tw-Z:)\u00079\tY#A\u0012uKN$X*[:tS:<g)\u001a;dQ\u0016\u00148\u000b^1si:{G/\u001b4jG\u0006$\u0018n\u001c8)\u0007=\tY#\u0001\u0018uKN$X*[:tS:<W*\u001e7uSR+g.\u00198u\r\u0016$8\r[3s'R\f'\u000f\u001e(pi&4\u0017nY1uS>t\u0007f\u0001\t\u0002,\u0005\u0001b/\u001a:jMf\fe\u000e\u001a*fY\u0016\f7/\u001a\u000b\n9\u0006\u0005\u00131JA+\u0003?Bq!a\u0011\u0012\u0001\u0004\t)%A\u0005bY2|7-\u0019;peB\u0019a%a\u0012\n\u0007\u0005%SCA\u0011DYV\u001cH/\u001a:MS:\\g)\u001a;dQJ+7\u000f]8og\u0016\fE\u000e\\8dCR|'\u000fC\u0004\u0002NE\u0001\r!a\u0014\u0002\rQ,g.\u00198u!\u0011q\u0012\u0011\u000b\u001d\n\u0007\u0005MsD\u0001\u0004PaRLwN\u001c\u0005\b\u0003/\n\u0002\u0019AA-\u0003)a\u0017N\\6D_:4\u0017n\u001a\t\u0004M\u0005m\u0013bAA/+\t\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\t\u000f\u0005\u0005\u0014\u00031\u0001\u0002d\u0005aQ\r\u001f9fGR,GmU5{KB\u0019a%!\u001a\n\u0007\u0005\u001dTCA\tGKR\u001c\u0007NU3ta>t7/Z*ju\u0016\fqbY8oM&<WO]3Ce>\\WM\u001d\u000b\b9\u00065\u0014qOAA\u0011\u001d\tyG\u0005a\u0001\u0003c\n1BZ3uG\",'\u000fU8pYB\u0019!&a\u001d\n\u0007\u0005UtCA\u0006GKR\u001c\u0007.\u001a:Q_>d\u0007bBA=%\u0001\u0007\u00111P\u0001\u000bi>$\u0018\r\u001c\"zi\u0016\u001c\bc\u0001\u0010\u0002~%\u0019\u0011qP\u0010\u0003\u0007%sG\u000fC\u0004\u0002\u0004J\u0001\r!a\u001f\u0002\u00115LgNQ=uKN$b!!\u0017\u0002\b\u0006-\u0005bBAE'\u0001\u0007\u00111P\u0001\u000fa\u0006\u0014H/\u001b;j_:\u0014\u0015\u0010^3t\u0011\u001d\tIh\u0005a\u0001\u0003w\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetchResponseAllocatorTest.class */
public class ClusterLinkFetchResponseAllocatorTest {
    private final KafkaConfig brokerConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
    private final Map<String, Object> currentBrokerProps = (Map) Map$.MODULE$.empty();
    private ConfluentConfigs.ClusterLinkThreadAffinity threadAffinity = ConfluentConfigs.ClusterLinkThreadAffinity.TENANT;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private Map<String, Object> currentBrokerProps() {
        return this.currentBrokerProps;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConfluentConfigs.ClusterLinkThreadAffinity threadAffinity() {
        return this.threadAffinity;
    }

    private void threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity clusterLinkThreadAffinity) {
        this.threadAffinity = clusterLinkThreadAffinity;
    }

    @BeforeEach
    public void setUp() {
        Mockito.when(brokerConfig().getInt(ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            return BoxesRunTime.boxToInteger($anonfun$setUp$1(this, invocationOnMock));
        });
        Mockito.when(brokerConfig().clusterLinkBackgroundThreadAffinity()).thenAnswer(invocationOnMock2 -> {
            return this.threadAffinity();
        });
    }

    @ValueSource(strings = {"Default", "InSync"})
    @ParameterizedTest
    public void testFetchResponseAllocator(String str) {
        FetcherPool fetcherPool = (FetcherPool) FetcherPool$.MODULE$.values().find(fetcherPool2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testFetchResponseAllocator$1(str, fetcherPool2));
        }).get();
        configureBroker(fetcherPool, 10000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.LINK);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool);
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc2"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(1250, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(100000, 200000), new FetchResponseSize(1250, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc3"), linkConfig(100000, 200000), new FetchResponseSize(1250, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(500, 2000), new FetchResponseSize(500, 2000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(500, 200000), new FetchResponseSize(500, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(1500, 2000), new FetchResponseSize(1500, 2000));
    }

    @ValueSource(strings = {"Default", "InSync"})
    @ParameterizedTest
    public void testMultiTenantFetchResponseAllocator(String str) {
        FetcherPool fetcherPool = (FetcherPool) FetcherPool$.MODULE$.values().find(fetcherPool2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMultiTenantFetchResponseAllocator$1(str, fetcherPool2));
        }).get();
        configureBroker(fetcherPool, 10000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.TENANT);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool);
        Assertions.assertEquals(0L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
        Assertions.assertEquals(0L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc2"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(1000, 1250));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(2500, 5000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(500, 1200), new FetchResponseSize(500, 1200));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(500, 200000), new FetchResponseSize(500, 1250));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(750, 1000), new FetchResponseSize(750, 1000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(500, 1200), new FetchResponseSize(500, 1200));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(500, 200000), new FetchResponseSize(500, 5000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(750, 1000), new FetchResponseSize(750, 1000));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), this.linkConfig(100000, 200000));
        });
        Assertions.assertEquals(0L, clusterLinkFetchResponseAllocator.totalAllocated().get());
    }

    @ValueSource(strings = {"Default", "InSync"})
    @ParameterizedTest
    public void testFetchResponseAllocatorWithoutTenants(String str) {
        FetcherPool fetcherPool = (FetcherPool) FetcherPool$.MODULE$.values().find(fetcherPool2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testFetchResponseAllocatorWithoutTenants$1(str, fetcherPool2));
        }).get();
        configureBroker(fetcherPool, 10000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.LINK);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(100000, 200000), new FetchResponseSize(1250, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(500, 2000), new FetchResponseSize(500, 2000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(500, 200000), new FetchResponseSize(500, 2500));
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(1500, 2000), new FetchResponseSize(1500, 2000));
    }

    @Test
    public void testFetcherChanges() {
        FetcherPool$Default$ fetcherPool$Default$ = FetcherPool$Default$.MODULE$;
        configureBroker(fetcherPool$Default$, 120000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.LINK);
        ClusterLinkConfig linkConfig = linkConfig(5000, 50000);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool$Default$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        Assertions.assertEquals(new FetchResponseSize(5000, 50000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(new FetchResponseSize(5000, 50000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(100000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        clusterLinkFetchResponseAllocator.onFetcherStart(None$.MODULE$);
        Assertions.assertEquals(new FetchResponseSize(5000, 20000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(new FetchResponseSize(1000, 1000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(121000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 50000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 20000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 50000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 1000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 4).foreach$mVc$sp(i -> {
            clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 30000);
        });
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 4).foreach$mVc$sp(i2 -> {
            Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        });
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 30000);
        clusterLinkFetchResponseAllocator.onFetcherShutdown(None$.MODULE$);
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(None$.MODULE$, 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
    }

    @Test
    public void testMultiTenantFetcherChanges() {
        FetcherPool$Default$ fetcherPool$Default$ = FetcherPool$Default$.MODULE$;
        configureBroker(fetcherPool$Default$, 120000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.TENANT);
        ClusterLinkConfig linkConfig = linkConfig(5000, 50000);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool$Default$);
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc2"));
        Assertions.assertEquals(new FetchResponseSize(5000, 50000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), linkConfig));
        Assertions.assertEquals(new FetchResponseSize(5000, 50000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        Assertions.assertEquals(100000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc3"));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc4"));
        Assertions.assertEquals(new FetchResponseSize(5000, 20000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc4"), linkConfig));
        Assertions.assertEquals(new FetchResponseSize(1000, 1000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), linkConfig));
        Assertions.assertEquals(121000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        Assertions.assertEquals(50000L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        Assertions.assertEquals(1000L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc1"), 50000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc4"), 20000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc4"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc2"), 50000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc3"), 1000);
        Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        Assertions.assertEquals(30000L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        Assertions.assertEquals(30000L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 4).foreach$mVc$sp(i -> {
            clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some(new StringBuilder(3).append("lkc").append(i).toString()), 30000);
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 4).foreach$mVc$sp(i2 -> {
            Assertions.assertEquals(new FetchResponseSize(5000, 30000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some(new StringBuilder(3).append("lkc").append(i2).toString()), linkConfig));
        });
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc4"), 30000);
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc4"));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc1"), 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc2"), 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc3"), 30000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc2"));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc1"), 40000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc2"), 40000);
        Assertions.assertEquals(new FetchResponseSize(5000, 20000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        Assertions.assertEquals(new FetchResponseSize(5000, 20000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc3"), 40000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc2"), 20000);
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc2"));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc1"), 40000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc2"), 20000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), linkConfig));
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(new Some("lkc3"), 40000);
        Assertions.assertEquals(new FetchResponseSize(5000, 40000), clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc3"), linkConfig));
        Assertions.assertEquals(120000L, clusterLinkFetchResponseAllocator.totalAllocated().get());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.maxAllocatedForTenant());
        Assertions.assertEquals(40000L, clusterLinkFetchResponseAllocator.minAllocatedForTenant());
    }

    @Test
    public void testMissingFetcherStartNotification() {
        FetcherPool$Default$ fetcherPool$Default$ = FetcherPool$Default$.MODULE$;
        configureBroker(fetcherPool$Default$, 10000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.LINK);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool$Default$);
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, None$.MODULE$, linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(2500, 5000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(2500, 5000));
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc2"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
    }

    @Test
    public void testMissingMultiTenantFetcherStartNotification() {
        FetcherPool$Default$ fetcherPool$Default$ = FetcherPool$Default$.MODULE$;
        configureBroker(fetcherPool$Default$, 10000, 1000);
        threadAffinity_$eq(ConfluentConfigs.ClusterLinkThreadAffinity.TENANT);
        ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator = new ClusterLinkFetchResponseAllocator(brokerConfig(), fetcherPool$Default$);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), this.linkConfig(100000, 200000));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), this.linkConfig(100000, 200000));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(None$.MODULE$, this.linkConfig(100000, 200000));
        });
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), this.linkConfig(100000, 200000));
        });
        clusterLinkFetchResponseAllocator.onFetcherStart(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(2500, 5000));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), this.linkConfig(100000, 200000));
        });
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc1"));
        verifyAndRelease(clusterLinkFetchResponseAllocator, new Some("lkc1"), linkConfig(100000, 200000), new FetchResponseSize(5000, 10000));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), this.linkConfig(100000, 200000));
        });
        clusterLinkFetchResponseAllocator.onFetcherShutdown(new Some("lkc1"));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc1"), this.linkConfig(100000, 200000));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            clusterLinkFetchResponseAllocator.acquireFetchBuffer(new Some("lkc2"), this.linkConfig(100000, 200000));
        });
    }

    private void verifyAndRelease(ClusterLinkFetchResponseAllocator clusterLinkFetchResponseAllocator, Option<String> option, ClusterLinkConfig clusterLinkConfig, FetchResponseSize fetchResponseSize) {
        Assertions.assertEquals(fetchResponseSize, clusterLinkFetchResponseAllocator.acquireFetchBuffer(option, clusterLinkConfig));
        Assertions.assertEquals(fetchResponseSize.responseSize(), clusterLinkFetchResponseAllocator.totalAllocated().get());
        clusterLinkFetchResponseAllocator.releaseFetchBuffer(option, fetchResponseSize.responseSize());
        Assertions.assertEquals(0L, clusterLinkFetchResponseAllocator.totalAllocated().get());
    }

    private void configureBroker(FetcherPool fetcherPool, int i, int i2) {
        if (FetcherPool$Default$.MODULE$.equals(fetcherPool)) {
            currentBrokerProps().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.fetch.response.total.bytes"), BoxesRunTime.boxToInteger(i)));
            currentBrokerProps().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.fetch.response.min.bytes"), BoxesRunTime.boxToInteger(i2)));
        } else {
            if (!FetcherPool$InSync$.MODULE$.equals(fetcherPool)) {
                throw new MatchError(fetcherPool);
            }
            currentBrokerProps().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.insync.fetch.response.total.bytes"), BoxesRunTime.boxToInteger(i)));
            currentBrokerProps().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.insync.fetch.response.min.bytes"), BoxesRunTime.boxToInteger(i2)));
        }
    }

    private ClusterLinkConfig linkConfig(int i, int i2) {
        ClusterLinkConfig clusterLinkConfig = (ClusterLinkConfig) Mockito.mock(ClusterLinkConfig.class);
        Mockito.when(clusterLinkConfig.replicaFetchResponseMaxBytes()).thenReturn(Predef$.MODULE$.int2Integer(i2));
        Mockito.when(clusterLinkConfig.replicaFetchMaxBytes()).thenReturn(Predef$.MODULE$.int2Integer(i));
        return clusterLinkConfig;
    }

    public static final /* synthetic */ int $anonfun$setUp$1(ClusterLinkFetchResponseAllocatorTest clusterLinkFetchResponseAllocatorTest, InvocationOnMock invocationOnMock) {
        return StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(clusterLinkFetchResponseAllocatorTest.currentBrokerProps().apply((String) invocationOnMock.getArgument(0)).toString()));
    }

    public static final /* synthetic */ boolean $anonfun$testFetchResponseAllocator$1(String str, FetcherPool fetcherPool) {
        String name = fetcherPool.name();
        return name == null ? str == null : name.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testMultiTenantFetchResponseAllocator$1(String str, FetcherPool fetcherPool) {
        String name = fetcherPool.name();
        return name == null ? str == null : name.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testFetchResponseAllocatorWithoutTenants$1(String str, FetcherPool fetcherPool) {
        String name = fetcherPool.name();
        return name == null ? str == null : name.equals(str);
    }
}
