package kafka.server;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchMetadata;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Builder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: FetchSessionTest.scala */
@Timeout(120)
@ScalaSignature(bytes = "\u0006\u0005\u0005-b\u0001B\n\u0015\u0001eAQ\u0001\t\u0001\u0005\u0002\u0005BQ\u0001\n\u0001\u0005\u0002\u0015BQA\u000e\u0001\u0005\u0002]BQ!\u0012\u0001\u0005\n\u0019CQ\u0001\u0015\u0001\u0005\u0002\u0015BQA\u0015\u0001\u0005\u0002\u0015Bq\u0001\u0016\u0001C\u0002\u0013%Q\u000b\u0003\u0004h\u0001\u0001\u0006IA\u0016\u0005\u0006Q\u0002!\t!\n\u0005\u0006U\u0002!\t!\n\u0005\u0006Y\u0002!\t!\n\u0005\u0006]\u0002!\t!\n\u0005\u0006a\u0002!\t!\n\u0005\u0006e\u0002!\t!\n\u0005\u0006i\u0002!\t!\n\u0005\u0006m\u0002!\t!\n\u0005\u0006q\u0002!\t!\n\u0005\u0006u\u0002!Ia\u001f\u0002\u0011\r\u0016$8\r[*fgNLwN\u001c+fgRT!!\u0006\f\u0002\rM,'O^3s\u0015\u00059\u0012!B6bM.\f7\u0001A\n\u0003\u0001i\u0001\"a\u0007\u0010\u000e\u0003qQ\u0011!H\u0001\u0006g\u000e\fG.Y\u0005\u0003?q\u0011a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u0001#!\t\u0019\u0003!D\u0001\u0015\u0003A!Xm\u001d;OK^\u001cVm]:j_:LE\rF\u0001'!\tYr%\u0003\u0002)9\t!QK\\5uQ\t\u0011!\u0006\u0005\u0002,i5\tAF\u0003\u0002.]\u0005\u0019\u0011\r]5\u000b\u0005=\u0002\u0014a\u00026va&$XM\u001d\u0006\u0003cI\nQA[;oSRT\u0011aM\u0001\u0004_J<\u0017BA\u001b-\u0005\u0011!Vm\u001d;\u0002'\u0005\u001c8/\u001a:u\u0007\u0006\u001c\u0007.Z\"p]R\f\u0017N\\:\u0015\u0007\u0019BT\bC\u0003:\u0007\u0001\u0007!(A\u0003dC\u000eDW\r\u0005\u0002$w%\u0011A\b\u0006\u0002\u0012\r\u0016$8\r[*fgNLwN\\\"bG\",\u0007\"\u0002 \u0004\u0001\u0004y\u0014AC:fgNLwN\\%egB\u00191\u0004\u0011\"\n\u0005\u0005c\"A\u0003\u001fsKB,\u0017\r^3e}A\u00111dQ\u0005\u0003\tr\u00111!\u00138u\u0003-!W/\\7z\u0007J,\u0017\r^3\u0015\u0005\u001ds\u0005C\u0001%L\u001d\t\u0019\u0013*\u0003\u0002K)\u0005aa)\u001a;dQN+7o]5p]&\u0011A*\u0014\u0002\n\u0007\u0006\u001b\u0005*R0N\u0003BS!A\u0013\u000b\t\u000b=#\u0001\u0019\u0001\"\u0002\tML'0Z\u0001\u0011i\u0016\u001cHoU3tg&|gnQ1dQ\u0016D#!\u0002\u0016\u00021Q,7\u000f\u001e*fg&TXmQ1dQ\u0016$7+Z:tS>t7\u000f\u000b\u0002\u0007U\u0005yQ)\u0014)U3~\u0003\u0016I\u0015+`\u0019&\u001bF+F\u0001W!\r9FLX\u0007\u00021*\u0011\u0011LW\u0001\u0005kRLGNC\u0001\\\u0003\u0011Q\u0017M^1\n\u0005uC&\u0001\u0002'jgR\u0004\"aX3\u000e\u0003\u0001T!!\u00192\u0002\r\r|W.\\8o\u0015\t92M\u0003\u0002ee\u00051\u0011\r]1dQ\u0016L!A\u001a1\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006\u0001R)\u0014)U3~\u0003\u0016I\u0015+`\u0019&\u001bF\u000bI\u0001\u0016i\u0016\u001cHoQ1dQ\u0016$G*Z1eKJ,\u0005o\\2iQ\tI!&\u0001\u000buKN$H*Y:u\r\u0016$8\r[3e\u000bB|7\r\u001b\u0015\u0003\u0015)\n\u0011\u0003^3ti\u001a+Go\u00195SKF,Xm\u001d;tQ\tY!&A\u000euKN$\u0018J\\2sK6,g\u000e^1m\r\u0016$8\r[*fgNLwN\u001c\u0015\u0003\u0019)\n!\u0004^3ti\u001a+Go\u00195TKN\u001c\u0018n\u001c8FqBL'/\u0019;j_:D#!\u0004\u0016\u0002;Q,7\u000f\u001e)sSZLG.Z4fIN+7o]5p]\"\u000bg\u000e\u001a7j]\u001eD#A\u0004\u0016\u00021Q,7\u000f\u001e.fe>\u001c\u0016N_3GKR\u001c\u0007nU3tg&|g\u000e\u000b\u0002\u0010U\u0005\u0011B/Z:u\t&4XM]4j]\u001e,\u0005o\\2iQ\t\u0001\"&\u0001\u0016uKN$H)\u001a9sS>\u0014\u0018\u000e^5{KN\u0004\u0016M\u001d;ji&|gn],ji\"\u0014VmY8sIN|e\u000e\\=)\u0005EQ\u0013!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8t\u001fJ$WM\u001d\u000b\u0005Mq\f\u0019\u0001C\u0003~%\u0001\u0007a0A\u0004d_:$X\r\u001f;\u0011\u0005\rz\u0018bAA\u0001)\taa)\u001a;dQ\u000e{g\u000e^3yi\"9\u0011Q\u0001\nA\u0002\u0005\u001d\u0011A\u00039beRLG/[8ogB)\u0011\u0011BA\r=:!\u00111BA\u000b\u001d\u0011\ti!a\u0005\u000e\u0005\u0005=!bAA\t1\u00051AH]8pizJ\u0011!H\u0005\u0004\u0003/a\u0012a\u00029bG.\fw-Z\u0005\u0005\u00037\tiBA\u0002TKFT1!a\u0006\u001dQ\u001d\u0001\u0011\u0011EA\u0014\u0003S\u00012aKA\u0012\u0013\r\t)\u0003\f\u0002\b)&lWm\\;u\u0003\u00151\u0018\r\\;f=\u0005A\b")
/* loaded from: input_file:kafka/server/FetchSessionTest.class */
public class FetchSessionTest {
    private final List<TopicPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList());

    @Test
    public void testNewSessionId() {
        FetchSessionCache fetchSessionCache = new FetchSessionCache(3, 100L);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 10000).foreach$mVc$sp(i -> {
            Assertions.assertTrue(fetchSessionCache.newSessionId() > 0);
        });
    }

    public void assertCacheContains(FetchSessionCache fetchSessionCache, Seq<Object> seq) {
        IntRef create = IntRef.create(0);
        seq.foreach(i -> {
            create.elem++;
            Assertions.assertTrue(fetchSessionCache.get(i).isDefined(), new StringBuilder(26).append("Missing session ").append(create.elem).append(" out of ").append(seq.size()).append("(").append(i).append(")").toString());
        });
        Assertions.assertEquals(seq.size(), fetchSessionCache.size());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ImplicitLinkedHashCollection<CachedPartition> dummyCreate(int i) {
        ImplicitLinkedHashCollection<CachedPartition> implicitLinkedHashCollection = new ImplicitLinkedHashCollection<>(i);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).foreach(i2 -> {
            return implicitLinkedHashCollection.add(new CachedPartition("test", i2));
        });
        return implicitLinkedHashCollection;
    }

    @Test
    public void testSessionCache() {
        FetchSessionCache fetchSessionCache = new FetchSessionCache(3, 100L);
        Assertions.assertEquals(0, fetchSessionCache.size());
        int maybeCreateSession = fetchSessionCache.maybeCreateSession(0L, false, 10, () -> {
            return this.dummyCreate(10);
        });
        int maybeCreateSession2 = fetchSessionCache.maybeCreateSession(10L, false, 20, () -> {
            return this.dummyCreate(20);
        });
        int maybeCreateSession3 = fetchSessionCache.maybeCreateSession(20L, false, 30, () -> {
            return this.dummyCreate(30);
        });
        Assertions.assertEquals(0, fetchSessionCache.maybeCreateSession(30L, false, 40, () -> {
            return this.dummyCreate(40);
        }));
        Assertions.assertEquals(0, fetchSessionCache.maybeCreateSession(40L, false, 5, () -> {
            return this.dummyCreate(5);
        }));
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession, maybeCreateSession2, maybeCreateSession3}));
        fetchSessionCache.touch((FetchSession) fetchSessionCache.get(maybeCreateSession).get(), 200L);
        int maybeCreateSession4 = fetchSessionCache.maybeCreateSession(210L, false, 11, () -> {
            return this.dummyCreate(11);
        });
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession, maybeCreateSession3, maybeCreateSession4}));
        fetchSessionCache.touch((FetchSession) fetchSessionCache.get(maybeCreateSession).get(), 400L);
        fetchSessionCache.touch((FetchSession) fetchSessionCache.get(maybeCreateSession3).get(), 390L);
        fetchSessionCache.touch((FetchSession) fetchSessionCache.get(maybeCreateSession4).get(), 400L);
        int maybeCreateSession5 = fetchSessionCache.maybeCreateSession(410L, false, 50, () -> {
            return this.dummyCreate(50);
        });
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession3, maybeCreateSession4, maybeCreateSession5}));
        Assertions.assertEquals(0, fetchSessionCache.maybeCreateSession(410L, false, 5, () -> {
            return this.dummyCreate(5);
        }));
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession3, maybeCreateSession5, fetchSessionCache.maybeCreateSession(410L, true, 5, () -> {
            return this.dummyCreate(5);
        })}));
    }

    @Test
    public void testResizeCachedSessions() {
        FetchSessionCache fetchSessionCache = new FetchSessionCache(2, 100L);
        Assertions.assertEquals(0L, fetchSessionCache.totalPartitions());
        Assertions.assertEquals(0, fetchSessionCache.size());
        Assertions.assertEquals(0L, fetchSessionCache.evictionsMeter().count());
        int maybeCreateSession = fetchSessionCache.maybeCreateSession(0L, false, 2, () -> {
            return this.dummyCreate(2);
        });
        Assertions.assertTrue(maybeCreateSession > 0);
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession}));
        FetchSession fetchSession = (FetchSession) fetchSessionCache.get(maybeCreateSession).get();
        Assertions.assertEquals(2, fetchSession.size());
        Assertions.assertEquals(2L, fetchSessionCache.totalPartitions());
        Assertions.assertEquals(1, fetchSessionCache.size());
        Assertions.assertEquals(0L, fetchSessionCache.evictionsMeter().count());
        int maybeCreateSession2 = fetchSessionCache.maybeCreateSession(0L, false, 4, () -> {
            return this.dummyCreate(4);
        });
        FetchSession fetchSession2 = (FetchSession) fetchSessionCache.get(maybeCreateSession2).get();
        Assertions.assertTrue(maybeCreateSession2 > 0);
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession, maybeCreateSession2}));
        Assertions.assertEquals(6L, fetchSessionCache.totalPartitions());
        Assertions.assertEquals(2, fetchSessionCache.size());
        Assertions.assertEquals(0L, fetchSessionCache.evictionsMeter().count());
        fetchSessionCache.touch(fetchSession, 200L);
        fetchSessionCache.touch(fetchSession2, 200L);
        int maybeCreateSession3 = fetchSessionCache.maybeCreateSession(200L, false, 5, () -> {
            return this.dummyCreate(5);
        });
        Assertions.assertTrue(maybeCreateSession3 > 0);
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession2, maybeCreateSession3}));
        Assertions.assertEquals(9L, fetchSessionCache.totalPartitions());
        Assertions.assertEquals(2, fetchSessionCache.size());
        Assertions.assertEquals(1L, fetchSessionCache.evictionsMeter().count());
        fetchSessionCache.remove(maybeCreateSession3);
        assertCacheContains(fetchSessionCache, ScalaRunTime$.MODULE$.wrapIntArray(new int[]{maybeCreateSession2}));
        Assertions.assertEquals(1, fetchSessionCache.size());
        Assertions.assertEquals(1L, fetchSessionCache.evictionsMeter().count());
        Assertions.assertEquals(4L, fetchSessionCache.totalPartitions());
        Iterator it = fetchSession2.partitionMap().iterator();
        it.next();
        it.remove();
        Assertions.assertEquals(3, fetchSession2.size());
        Assertions.assertEquals(4, fetchSession2.cachedSize());
        fetchSessionCache.touch(fetchSession2, fetchSession2.lastUsedMs());
        Assertions.assertEquals(3L, fetchSessionCache.totalPartitions());
    }

    private List<TopicPartition> EMPTY_PART_LIST() {
        return this.EMPTY_PART_LIST;
    }

    @Test
    public void testCachedLeaderEpoch() {
        FetchManager fetchManager = new FetchManager(new MockTime(), new FetchSessionCache(10, 1000L));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        TopicPartition topicPartition3 = new TopicPartition("bar", 1);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicPartition, new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(topicPartition2, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(1))));
        linkedHashMap.put(topicPartition3, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(2))));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Map cachedLeaderEpochs$1 = cachedLeaderEpochs$1(newContext);
        Assertions.assertEquals(Optional.empty(), cachedLeaderEpochs$1.apply(topicPartition));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(1)), cachedLeaderEpochs$1.apply(topicPartition2));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(2)), cachedLeaderEpochs$1.apply(topicPartition3));
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(topicPartition2, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition2.partition()).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        linkedHashMap2.put(topicPartition3, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition3.partition()).setHighWatermark(5L).setLastStableOffset(5L).setLogStartOffset(5L));
        int sessionId = newContext.updateAndGenerateResponseData(linkedHashMap2).sessionId();
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(sessionId, 1), new LinkedHashMap(), EMPTY_PART_LIST(), false);
        Map cachedLeaderEpochs$12 = cachedLeaderEpochs$1(newContext2);
        Assertions.assertEquals(Optional.empty(), cachedLeaderEpochs$1.apply(topicPartition));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(1)), cachedLeaderEpochs$12.apply(topicPartition2));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(2)), cachedLeaderEpochs$12.apply(topicPartition3));
        newContext2.updateAndGenerateResponseData(linkedHashMap2).sessionId();
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicPartition, new FetchRequest.PartitionData(0L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(6))));
        linkedHashMap3.put(topicPartition2, new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        linkedHashMap3.put(topicPartition3, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(3))));
        Map cachedLeaderEpochs$13 = cachedLeaderEpochs$1(fetchManager.newContext(new FetchMetadata(sessionId, 2), linkedHashMap3, EMPTY_PART_LIST(), false));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(6)), cachedLeaderEpochs$13.apply(topicPartition));
        Assertions.assertEquals(Optional.empty(), cachedLeaderEpochs$13.apply(topicPartition2));
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(3)), cachedLeaderEpochs$13.apply(topicPartition3));
    }

    @Test
    public void testLastFetchedEpoch() {
        FetchManager fetchManager = new FetchManager(new MockTime(), new FetchSessionCache(10, 1000L));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        TopicPartition topicPartition3 = new TopicPartition("bar", 1);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicPartition, new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty(), Optional.empty()));
        linkedHashMap.put(topicPartition2, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(1)), Optional.empty()));
        linkedHashMap.put(topicPartition3, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(2)), Optional.of(Predef$.MODULE$.int2Integer(1))));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.of(BoxesRunTime.boxToInteger(1))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(2)))})), cachedLeaderEpochs$2(newContext));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(1)))})), cachedLastFetchedEpochs$1(newContext));
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(topicPartition2, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition2.partition()).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        linkedHashMap2.put(topicPartition3, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition3.partition()).setHighWatermark(5L).setLastStableOffset(5L).setLogStartOffset(5L));
        int sessionId = newContext.updateAndGenerateResponseData(linkedHashMap2).sessionId();
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(sessionId, 1), new LinkedHashMap(), EMPTY_PART_LIST(), false);
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.of(BoxesRunTime.boxToInteger(1))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(2)))})), cachedLeaderEpochs$2(newContext2));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(1)))})), cachedLastFetchedEpochs$1(newContext2));
        newContext2.updateAndGenerateResponseData(linkedHashMap2).sessionId();
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicPartition, new FetchRequest.PartitionData(0L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(6)), Optional.of(Predef$.MODULE$.int2Integer(5))));
        linkedHashMap3.put(topicPartition2, new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty(), Optional.empty()));
        linkedHashMap3.put(topicPartition3, new FetchRequest.PartitionData(10L, 0L, 100, Optional.of(Predef$.MODULE$.int2Integer(3)), Optional.of(Predef$.MODULE$.int2Integer(3))));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.of(BoxesRunTime.boxToInteger(6))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(3)))})), cachedLeaderEpochs$2(fetchManager.newContext(new FetchMetadata(sessionId, 2), linkedHashMap3, EMPTY_PART_LIST(), false)));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.of(BoxesRunTime.boxToInteger(5))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.empty()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition3), Optional.of(BoxesRunTime.boxToInteger(3)))})), cachedLastFetchedEpochs$1(newContext2));
    }

    @Test
    public void testFetchRequests() {
        FetchResponse updateAndGenerateResponseData;
        MockTime mockTime = new MockTime();
        FetchSessionCache fetchSessionCache = new FetchSessionCache(10, 1000L);
        FetchManager fetchManager = new FetchManager(mockTime, fetchSessionCache);
        Assertions.assertEquals(SessionlessFetchContext.class, fetchManager.newContext(FetchMetadata.LEGACY, new HashMap(), EMPTY_PART_LIST(), true).getClass());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        Iterator it = linkedHashMap.entrySet().iterator();
        newContext.foreachPartition((topicPartition, partitionData) -> {
            $anonfun$testFetchRequests$1(it, topicPartition, partitionData);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(0L, BoxesRunTime.unboxToLong(newContext.getFetchOffset(new TopicPartition("foo", 0)).get()));
        Assertions.assertEquals(10L, BoxesRunTime.unboxToLong(newContext.getFetchOffset(new TopicPartition("foo", 1)).get()));
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData2 = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertTrue(updateAndGenerateResponseData2.sessionId() != 0);
        Assertions.assertEquals(linkedHashMap2, updateAndGenerateResponseData2.responseData());
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData2.sessionId(), 5), linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(SessionErrorContext.class, newContext2.getClass());
        Assertions.assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH, newContext2.updateAndGenerateResponseData(linkedHashMap2).error());
        FetchContext newContext3 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData2.sessionId() + 1, 1), linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(SessionErrorContext.class, newContext3.getClass());
        Assertions.assertEquals(Errors.FETCH_SESSION_ID_NOT_FOUND, newContext3.updateAndGenerateResponseData(linkedHashMap2).error());
        FetchContext newContext4 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData2.sessionId(), 1), new LinkedHashMap(), EMPTY_PART_LIST(), false);
        Assertions.assertEquals(IncrementalFetchContext.class, newContext4.getClass());
        Iterator it2 = linkedHashMap.entrySet().iterator();
        newContext4.foreachPartition((topicPartition2, partitionData2) -> {
            $anonfun$testFetchRequests$2(it2, topicPartition2, partitionData2);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(10L, BoxesRunTime.unboxToLong(newContext4.getFetchOffset(new TopicPartition("foo", 1)).get()));
        FetchResponse updateAndGenerateResponseData3 = newContext4.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(updateAndGenerateResponseData2.sessionId(), updateAndGenerateResponseData3.sessionId());
        Assertions.assertEquals(0, updateAndGenerateResponseData3.responseData().size());
        FetchContext newContext5 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData2.sessionId(), 5), linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(SessionErrorContext.class, newContext5.getClass());
        Assertions.assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH, newContext5.updateAndGenerateResponseData(linkedHashMap2).error());
        FetchResponse throttledResponse = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData2.sessionId(), 2), new LinkedHashMap(), EMPTY_PART_LIST(), false).getThrottledResponse(100);
        Assertions.assertEquals(Errors.NONE, throttledResponse.error());
        Assertions.assertEquals(updateAndGenerateResponseData2.sessionId(), throttledResponse.sessionId());
        Assertions.assertEquals(100, throttledResponse.throttleTimeMs());
        int sessionId = updateAndGenerateResponseData3.sessionId();
        do {
            LinkedHashMap linkedHashMap3 = new LinkedHashMap();
            linkedHashMap3.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
            linkedHashMap3.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
            FetchContext newContext6 = fetchManager.newContext(new FetchMetadata(sessionId, -1), linkedHashMap3, EMPTY_PART_LIST(), false);
            Assertions.assertEquals(SessionlessFetchContext.class, newContext6.getClass());
            Assertions.assertEquals(0, fetchSessionCache.size());
            LinkedHashMap linkedHashMap4 = new LinkedHashMap();
            linkedHashMap4.put(new TopicPartition("bar", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
            linkedHashMap4.put(new TopicPartition("bar", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
            updateAndGenerateResponseData = newContext6.updateAndGenerateResponseData(linkedHashMap4);
            Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        } while (updateAndGenerateResponseData.sessionId() == sessionId);
    }

    @Test
    public void testIncrementalFetchSession() {
        FetchManager fetchManager = new FetchManager(new MockTime(), new FetchSessionCache(10, 1000L));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertTrue(updateAndGenerateResponseData.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData().size());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(15L, 0L, 0, Optional.empty()));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("foo", 0));
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData.sessionId(), 1), linkedHashMap3, arrayList, false);
        Assertions.assertEquals(IncrementalFetchContext.class, newContext2.getClass());
        scala.collection.Iterator it = ((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{new TopicPartition("foo", 1), new TopicPartition("bar", 0)}))).iterator();
        newContext2.foreachPartition((topicPartition, partitionData) -> {
            $anonfun$testIncrementalFetchSession$1(it, topicPartition, partitionData);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(None$.MODULE$, newContext2.getFetchOffset(new TopicPartition("foo", 0)));
        Assertions.assertEquals(10L, BoxesRunTime.unboxToLong(newContext2.getFetchOffset(new TopicPartition("foo", 1)).get()));
        Assertions.assertEquals(15L, BoxesRunTime.unboxToLong(newContext2.getFetchOffset(new TopicPartition("bar", 0)).get()));
        Assertions.assertEquals(None$.MODULE$, newContext2.getFetchOffset(new TopicPartition("bar", 2)));
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        linkedHashMap4.put(new TopicPartition("bar", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(linkedHashMap4);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(1, updateAndGenerateResponseData2.responseData().size());
        Assertions.assertTrue(updateAndGenerateResponseData2.sessionId() > 0);
    }

    @Test
    public void testFetchSessionExpiration() {
        MockTime mockTime = new MockTime();
        FetchSessionCache fetchSessionCache = new FetchSessionCache(2, 1000L);
        FetchManager fetchManager = new FetchManager(mockTime, fetchSessionCache);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertTrue(updateAndGenerateResponseData.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData().size());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined());
        mockTime.sleep(500L);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap3.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext2 = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext2.getClass());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap4.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertTrue(updateAndGenerateResponseData2.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData2.responseData().size());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData2.sessionId()).isDefined());
        mockTime.sleep(500L);
        Assertions.assertEquals(IncrementalFetchContext.class, fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData.sessionId(), 1), new LinkedHashMap(), new ArrayList(), false).getClass());
        mockTime.sleep(501L);
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap5.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        FetchContext newContext3 = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap5, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext3.getClass());
        LinkedHashMap linkedHashMap6 = new LinkedHashMap();
        linkedHashMap6.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap6.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData3 = newContext3.updateAndGenerateResponseData(linkedHashMap6);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertTrue(updateAndGenerateResponseData3.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData3.responseData().size());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined());
        Assertions.assertFalse(fetchSessionCache.get(updateAndGenerateResponseData2.sessionId()).isDefined(), "session 2 should have been evicted by latest session, as session 1 was used more recently");
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData3.sessionId()).isDefined());
    }

    @Test
    public void testPrivilegedSessionHandling() {
        MockTime mockTime = new MockTime();
        FetchSessionCache fetchSessionCache = new FetchSessionCache(2, 1000L);
        FetchManager fetchManager = new FetchManager(mockTime, fetchSessionCache);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), true);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertTrue(updateAndGenerateResponseData.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData().size());
        Assertions.assertEquals(1, fetchSessionCache.size());
        mockTime.sleep(500L);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap3.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext2 = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext2.getClass());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap4.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertTrue(updateAndGenerateResponseData2.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData2.responseData().size());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData2.sessionId()).isDefined());
        Assertions.assertEquals(2, fetchSessionCache.size());
        mockTime.sleep(500L);
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap5.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        FetchContext newContext3 = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap5, EMPTY_PART_LIST(), true);
        Assertions.assertEquals(FullFetchContext.class, newContext3.getClass());
        LinkedHashMap linkedHashMap6 = new LinkedHashMap();
        linkedHashMap6.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap6.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData3 = newContext3.updateAndGenerateResponseData(linkedHashMap6);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertTrue(updateAndGenerateResponseData3.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData3.responseData().size());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined());
        Assertions.assertFalse(fetchSessionCache.get(updateAndGenerateResponseData2.sessionId()).isDefined(), "session 2 should have been evicted by session 3");
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData3.sessionId()).isDefined());
        Assertions.assertEquals(2, fetchSessionCache.size());
        mockTime.sleep(501L);
        LinkedHashMap linkedHashMap7 = new LinkedHashMap();
        linkedHashMap7.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap7.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        Assertions.assertEquals(FullFetchContext.class, fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap7, EMPTY_PART_LIST(), true).getClass());
        LinkedHashMap linkedHashMap8 = new LinkedHashMap();
        linkedHashMap8.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap8.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData4 = newContext3.updateAndGenerateResponseData(linkedHashMap8);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData4.error());
        Assertions.assertTrue(updateAndGenerateResponseData4.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData4.responseData().size());
        Assertions.assertFalse(fetchSessionCache.get(updateAndGenerateResponseData.sessionId()).isDefined(), "session 1 should have been evicted by session 4 even though it is privileged as it has hit eviction time");
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData3.sessionId()).isDefined());
        Assertions.assertTrue(fetchSessionCache.get(updateAndGenerateResponseData4.sessionId()).isDefined());
        Assertions.assertEquals(2, fetchSessionCache.size());
    }

    @Test
    public void testZeroSizeFetchSession() {
        MockTime mockTime = new MockTime();
        FetchSessionCache fetchSessionCache = new FetchSessionCache(10, 1000L);
        FetchManager fetchManager = new FetchManager(mockTime, fetchSessionCache);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()));
        linkedHashMap.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10L, 0L, 100, Optional.empty()));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(100L).setLastStableOffset(100L).setLogStartOffset(100L));
        linkedHashMap2.put(new TopicPartition("foo", 1), new FetchResponseData.PartitionData().setPartitionIndex(1).setHighWatermark(10L).setLastStableOffset(10L).setLogStartOffset(10L));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertTrue(updateAndGenerateResponseData.sessionId() != 0);
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData().size());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("foo", 0));
        arrayList.add(new TopicPartition("foo", 1));
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData.sessionId(), 1), linkedHashMap3, arrayList, false);
        Assertions.assertEquals(SessionlessFetchContext.class, newContext2.getClass());
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(new LinkedHashMap());
        Assertions.assertEquals(0, updateAndGenerateResponseData2.sessionId());
        Assertions.assertTrue(updateAndGenerateResponseData2.responseData().isEmpty());
        Assertions.assertEquals(0, fetchSessionCache.size());
    }

    @Test
    public void testDivergingEpoch() {
        FetchManager fetchManager = new FetchManager(new MockTime(), new FetchSessionCache(10, 1000L));
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("bar", 2);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicPartition, new FetchRequest.PartitionData(100L, 0L, 1000, Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.of(Predef$.MODULE$.int2Integer(4))));
        linkedHashMap.put(topicPartition2, new FetchRequest.PartitionData(100L, 0L, 1000, Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.of(Predef$.MODULE$.int2Integer(4))));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(105L).setLastStableOffset(105L).setLogStartOffset(0L));
        FetchResponseData.EpochEndOffset endOffset = new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90L);
        linkedHashMap2.put(topicPartition2, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition2.partition()).setHighWatermark(105L).setLastStableOffset(105L).setLogStartOffset(0L).setDivergingEpoch(endOffset));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertNotEquals(0, updateAndGenerateResponseData.sessionId());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}), updateAndGenerateResponseData.responseData().keySet());
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData.sessionId(), 1), linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(IncrementalFetchContext.class, newContext2.getClass());
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(updateAndGenerateResponseData.sessionId(), updateAndGenerateResponseData2.sessionId());
        Assertions.assertEquals(Collections.singleton(topicPartition2), updateAndGenerateResponseData2.responseData().keySet());
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(105L).setLastStableOffset(105L).setLogStartOffset(0L).setDivergingEpoch(endOffset));
        FetchResponse updateAndGenerateResponseData3 = newContext2.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(updateAndGenerateResponseData.sessionId(), updateAndGenerateResponseData3.sessionId());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}), updateAndGenerateResponseData3.responseData().keySet());
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(110L).setLastStableOffset(110L).setLogStartOffset(0L));
        FetchResponse updateAndGenerateResponseData4 = newContext2.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData4.error());
        Assertions.assertEquals(updateAndGenerateResponseData.sessionId(), updateAndGenerateResponseData4.sessionId());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}), updateAndGenerateResponseData4.responseData().keySet());
    }

    @Test
    public void testDeprioritizesPartitionsWithRecordsOnly() {
        FetchManager fetchManager = new FetchManager(new MockTime(), new FetchSessionCache(10, 1000L));
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("bar", 2);
        TopicPartition topicPartition3 = new TopicPartition("zar", 3);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicPartition, new FetchRequest.PartitionData(100L, 0L, 1000, Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.of(Predef$.MODULE$.int2Integer(4))));
        linkedHashMap.put(topicPartition2, new FetchRequest.PartitionData(100L, 0L, 1000, Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.of(Predef$.MODULE$.int2Integer(4))));
        linkedHashMap.put(topicPartition3, new FetchRequest.PartitionData(100L, 0L, 1000, Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.of(Predef$.MODULE$.int2Integer(4))));
        FetchContext newContext = fetchManager.newContext(FetchMetadata.INITIAL, linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(FullFetchContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(50L).setLastStableOffset(50L).setLogStartOffset(0L));
        linkedHashMap2.put(topicPartition2, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition2.partition()).setHighWatermark(50L).setLastStableOffset(50L).setLogStartOffset(0L));
        linkedHashMap2.put(topicPartition3, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition3.partition()).setHighWatermark(50L).setLastStableOffset(50L).setLogStartOffset(0L));
        FetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData(linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertNotEquals(0, updateAndGenerateResponseData.sessionId());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2, topicPartition3}), updateAndGenerateResponseData.responseData().keySet());
        FetchContext newContext2 = fetchManager.newContext(new FetchMetadata(updateAndGenerateResponseData.sessionId(), 1), linkedHashMap, EMPTY_PART_LIST(), false);
        Assertions.assertEquals(IncrementalFetchContext.class, newContext2.getClass());
        assertPartitionsOrder(newContext2, new $colon.colon(topicPartition, new $colon.colon(topicPartition2, new $colon.colon(topicPartition3, Nil$.MODULE$))));
        FetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData(new LinkedHashMap());
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(updateAndGenerateResponseData.sessionId(), updateAndGenerateResponseData2.sessionId());
        Assertions.assertEquals(Collections.emptySet(), updateAndGenerateResponseData2.responseData().keySet());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicPartition, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setHighWatermark(60L).setLastStableOffset(50L).setLogStartOffset(0L));
        linkedHashMap3.put(topicPartition2, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition2.partition()).setHighWatermark(60L).setLastStableOffset(50L).setLogStartOffset(0L).setRecords(MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(100L, (byte[]) null)})));
        linkedHashMap3.put(topicPartition3, new FetchResponseData.PartitionData().setPartitionIndex(topicPartition3.partition()).setHighWatermark(50L).setLastStableOffset(50L).setLogStartOffset(0L));
        FetchResponse updateAndGenerateResponseData3 = newContext2.updateAndGenerateResponseData(linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(updateAndGenerateResponseData.sessionId(), updateAndGenerateResponseData3.sessionId());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}), updateAndGenerateResponseData3.responseData().keySet());
        assertPartitionsOrder(newContext2, new $colon.colon(topicPartition, new $colon.colon(topicPartition3, new $colon.colon(topicPartition2, Nil$.MODULE$))));
    }

    private void assertPartitionsOrder(FetchContext fetchContext, Seq<TopicPartition> seq) {
        ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        fetchContext.foreachPartition((topicPartition, partitionData) -> {
            empty.$plus$eq(topicPartition);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(seq, empty.toSeq());
    }

    public static final /* synthetic */ void $anonfun$testCachedLeaderEpoch$1(Builder builder, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        builder.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), partitionData.currentLeaderEpoch));
    }

    private static final Map cachedLeaderEpochs$1(FetchContext fetchContext) {
        Builder newBuilder = Predef$.MODULE$.Map().newBuilder();
        fetchContext.foreachPartition((topicPartition, partitionData) -> {
            $anonfun$testCachedLeaderEpoch$1(newBuilder, topicPartition, partitionData);
            return BoxedUnit.UNIT;
        });
        return (Map) newBuilder.result();
    }

    public static final /* synthetic */ void $anonfun$testLastFetchedEpoch$1(Builder builder, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        builder.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), partitionData.currentLeaderEpoch));
    }

    private static final Map cachedLeaderEpochs$2(FetchContext fetchContext) {
        Builder newBuilder = Predef$.MODULE$.Map().newBuilder();
        fetchContext.foreachPartition((topicPartition, partitionData) -> {
            $anonfun$testLastFetchedEpoch$1(newBuilder, topicPartition, partitionData);
            return BoxedUnit.UNIT;
        });
        return (Map) newBuilder.result();
    }

    public static final /* synthetic */ void $anonfun$testLastFetchedEpoch$2(Builder builder, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        builder.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), partitionData.lastFetchedEpoch));
    }

    private static final Map cachedLastFetchedEpochs$1(FetchContext fetchContext) {
        Builder newBuilder = Predef$.MODULE$.Map().newBuilder();
        fetchContext.foreachPartition((topicPartition, partitionData) -> {
            $anonfun$testLastFetchedEpoch$2(newBuilder, topicPartition, partitionData);
            return BoxedUnit.UNIT;
        });
        return (Map) newBuilder.result();
    }

    public static final /* synthetic */ void $anonfun$testFetchRequests$1(Iterator it, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        Map.Entry entry = (Map.Entry) it.next();
        Assertions.assertEquals(entry.getKey(), topicPartition);
        Assertions.assertEquals(entry.getValue(), partitionData);
    }

    public static final /* synthetic */ void $anonfun$testFetchRequests$2(Iterator it, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        Map.Entry entry = (Map.Entry) it.next();
        Assertions.assertEquals(entry.getKey(), topicPartition);
        Assertions.assertEquals(entry.getValue(), partitionData);
    }

    public static final /* synthetic */ void $anonfun$testIncrementalFetchSession$1(scala.collection.Iterator iterator, TopicPartition topicPartition, FetchRequest.PartitionData partitionData) {
        Assertions.assertEquals(iterator.next(), topicPartition);
    }
}
