package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\r5f\u0001\u0002!B\u0001\u0019CQ!\u0014\u0001\u0005\u00029Cq!\u0015\u0001C\u0002\u0013%!\u000b\u0003\u0004_\u0001\u0001\u0006Ia\u0015\u0005\b?\u0002\u0011\r\u0011\"\u0003S\u0011\u0019\u0001\u0007\u0001)A\u0005'\"9\u0011\r\u0001b\u0001\n\u0013\u0011\u0006B\u00022\u0001A\u0003%1\u000bC\u0004d\u0001\t\u0007I\u0011\u00033\t\r-\u0004\u0001\u0015!\u0003f\u0011\u001da\u0007A1A\u0005\u00125Da!\u001d\u0001!\u0002\u0013q\u0007b\u0002:\u0001\u0005\u0004%Ia\u001d\u0005\u0007o\u0002\u0001\u000b\u0011\u0002;\t\u000fa\u0004!\u0019!C\u0005g\"1\u0011\u0010\u0001Q\u0001\nQDqA\u001f\u0001C\u0002\u0013%1\u0010C\u0004\u0002\u0016\u0001\u0001\u000b\u0011\u0002?\t\u0013\u0005]\u0001A1A\u0005\n\u0005e\u0001\u0002CA\u001e\u0001\u0001\u0006I!a\u0007\t\u0013\u0005u\u0002A1A\u0005\n\u0005}\u0002\u0002CA'\u0001\u0001\u0006I!!\u0011\t\u0013\u0005=\u0003A1A\u0005\n\u0005E\u0003\u0002CA0\u0001\u0001\u0006I!a\u0015\t\u000f\u0005\u0005\u0004\u0001\"\u0003\u0002d!I\u0011\u0011\u0012\u0001\u0012\u0002\u0013%\u00111\u0012\u0005\b\u0003C\u0003A\u0011AAR\u0011\u001d\t\t\r\u0001C\t\u0003\u0007D\u0011B!\u0019\u0001#\u0003%\tBa\u0019\t\u0013\t\u001d\u0004!%A\u0005\u0012\t%\u0004\"\u0003B7\u0001E\u0005I\u0011\u0003B8\u0011%\u0011\u0019\bAI\u0001\n#\u0011y\u0007C\u0004\u0003v\u0001!\t!a)\t\u000f\t}\u0004\u0001\"\u0001\u0002$\"9!1\u0011\u0001\u0005\u0002\t\u0015\u0005b\u0002BQ\u0001\u0011\u0005\u00111\u0015\u0005\b\u0005K\u0003A\u0011AAR\u0011\u001d\u0011I\u000b\u0001C\u0001\u0003GCqA!,\u0001\t#\u0011y\u000bC\u0005\u0003D\u0002\t\n\u0011\"\u0005\u0002\f\"9!Q\u0019\u0001\u0005\u0012\t\u001d\u0007b\u0002Bw\u0001\u0011\u0005!q\u001e\u0005\n\u0005s\u0004\u0011\u0013!C\u0001\u0003\u0017CqAa?\u0001\t\u0003\t\u0019\u000bC\u0004\u0003��\u0002!\t!a)\t\u000f\r\r\u0001\u0001\"\u0001\u0002$\"91q\u0001\u0001\u0005\u0002\u0005\r\u0006bBB\u0006\u0001\u0011\u0005\u00111\u0015\u0005\b\u0007\u001f\u0001A\u0011AAR\u0011\u001d\u0019\u0019\u0002\u0001C\u0001\u0003GCqaa\u0006\u0001\t\u0003\t\u0019\u000bC\u0004\u0004\u001c\u0001!\t!a)\t\u000f\r}\u0001\u0001\"\u0001\u0002$\"911\u0005\u0001\u0005\u0002\u0005\r\u0006bBB\u0014\u0001\u0011\u0005\u00111\u0015\u0005\b\u0007W\u0001A\u0011AAR\u0011\u001d\u0019y\u0003\u0001C\u0001\u0003GCqaa\r\u0001\t\u0003\t\u0019\u000bC\u0004\u00048\u0001!\t!a)\t\u000f\rm\u0002\u0001\"\u0003\u0004>!911\b\u0001\u0005\n\r-\u0004bBBB\u0001\u0011%1Q\u0011\u0005\b\u0007\u0017\u0003A\u0011ABG\u0011\u001d\u0019I\u000b\u0001C\u0005\u0007W\u0013\u0001DU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G+Z:u\u0015\t\u00115)\u0001\u0004tKJ4XM\u001d\u0006\u0002\t\u0006)1.\u00194lC\u000e\u00011C\u0001\u0001H!\tA5*D\u0001J\u0015\u0005Q\u0015!B:dC2\f\u0017B\u0001'J\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012a\u0014\t\u0003!\u0002i\u0011!Q\u0001\u0005iF\u0002\b'F\u0001T!\t!F,D\u0001V\u0015\t1v+\u0001\u0004d_6lwN\u001c\u0006\u0003\tbS!!\u0017.\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0016aA8sO&\u0011Q,\u0016\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0003\u0015!\u0018\u0007\u001d\u0019!\u0003\u0011!\u0018\u0007]\u0019\u0002\u000bQ\f\u0004/\r\u0011\u0002\tQ\u0014\u0004/M\u0001\u0006iJ\u0002\u0018\u0007I\u0001\u000fEJ|7.\u001a:F]\u0012\u0004v.\u001b8u+\u0005)\u0007C\u00014j\u001b\u00059'B\u00015D\u0003\u001d\u0019G.^:uKJL!A[4\u0003\u001d\t\u0013xn[3s\u000b:$\u0007k\\5oi\u0006y!M]8lKJ,e\u000e\u001a)pS:$\b%\u0001\tgC&dW\r\u001a)beRLG/[8ogV\ta\u000e\u0005\u0002Q_&\u0011\u0001/\u0011\u0002\u0011\r\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N\f\u0011CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:!\u0003!!x\u000e]5d\u0013\u0012\fT#\u0001;\u0011\u0005Q+\u0018B\u0001<V\u0005\u0011)V/\u001b3\u0002\u0013Q|\u0007/[2JIF\u0002\u0013\u0001\u0003;pa&\u001c\u0017\n\u001a\u001a\u0002\u0013Q|\u0007/[2JIJ\u0002\u0013\u0001\u0003;pa&\u001c\u0017\nZ:\u0016\u0003q\u0004b!`A\u0001\u0003\u000b!X\"\u0001@\u000b\u0005}L\u0015AC2pY2,7\r^5p]&\u0019\u00111\u0001@\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0002\b\u0005EQBAA\u0005\u0015\u0011\tY!!\u0004\u0002\t1\fgn\u001a\u0006\u0003\u0003\u001f\tAA[1wC&!\u00111CA\u0005\u0005\u0019\u0019FO]5oO\u0006IAo\u001c9jG&#7\u000fI\u0001\u0010a\u0006\u0014H/\u001b;j_:\u001cF/\u0019;fgV\u0011\u00111\u0004\t\u0007\u0003;\t\u0019#a\n\u000e\u0005\u0005}!\u0002BA\u0011\u0003\u001b\tA!\u001e;jY&!\u0011QEA\u0010\u0005\u0011a\u0015n\u001d;\u0011\t\u0005%\u0012Q\u0007\b\u0005\u0003W\t\t$\u0004\u0002\u0002.)\u0019\u0011qF+\u0002\u000f5,7o]1hK&!\u00111GA\u0017\u0003e)\u0006\u000fZ1uK6+G/\u00193bi\u0006\u0014V-];fgR$\u0015\r^1\n\t\u0005]\u0012\u0011\b\u0002\u001d+B$\u0017\r^3NKR\fG-\u0019;b!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;f\u0015\u0011\t\u0019$!\f\u0002!A\f'\u000f^5uS>t7\u000b^1uKN\u0004\u0013!F;qI\u0006$X-T3uC\u0012\fG/\u0019*fcV,7\u000f^\u000b\u0003\u0003\u0003\u0002B!a\u0011\u0002J5\u0011\u0011Q\t\u0006\u0004\u0003\u000f*\u0016\u0001\u0003:fcV,7\u000f^:\n\t\u0005-\u0013Q\t\u0002\u0016+B$\u0017\r^3NKR\fG-\u0019;b%\u0016\fX/Z:u\u0003Y)\b\u000fZ1uK6+G/\u00193bi\u0006\u0014V-];fgR\u0004\u0013!D7fi\u0006$\u0017\r^1DC\u000eDW-\u0006\u0002\u0002TA!\u0011QKA.\u001b\t\t9FC\u0002\u0002Z\u0005\u000b\u0001\"\\3uC\u0012\fG/Y\u0005\u0005\u0003;\n9FA\b[W6+G/\u00193bi\u0006\u001c\u0015m\u00195f\u00039iW\r^1eCR\f7)Y2iK\u0002\n\u0011#\u001b8ji&\fGNR3uG\"\u001cF/\u0019;f)!\t)'a\u001b\u0002v\u0005}\u0004c\u0001)\u0002h%\u0019\u0011\u0011N!\u0003#%s\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\rC\u0004\u0002na\u0001\r!a\u001c\u0002\u000fQ|\u0007/[2JIB!\u0001*!\u001du\u0013\r\t\u0019(\u0013\u0002\u0007\u001fB$\u0018n\u001c8\t\u000f\u0005]\u0004\u00041\u0001\u0002z\u0005Ya-\u001a;dQ>3gm]3u!\rA\u00151P\u0005\u0004\u0003{J%\u0001\u0002'p]\u001eD\u0011\"!!\u0019!\u0003\u0005\r!a!\u0002\u00171,\u0017\rZ3s\u000bB|7\r\u001b\t\u0004\u0011\u0006\u0015\u0015bAAD\u0013\n\u0019\u0011J\u001c;\u00027%t\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\r\n3fM\u0006,H\u000e\u001e\u00134+\t\tiI\u000b\u0003\u0002\u0004\u0006=5FAAI!\u0011\t\u0019*!(\u000e\u0005\u0005U%\u0002BAL\u00033\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005m\u0015*\u0001\u0006b]:|G/\u0019;j_:LA!a(\u0002\u0016\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002\u000f\rdW-\u00198vaR\u0011\u0011Q\u0015\t\u0004\u0011\u0006\u001d\u0016bAAU\u0013\n!QK\\5uQ\rQ\u0012Q\u0016\t\u0005\u0003_\u000bi,\u0004\u0002\u00022*!\u00111WA[\u0003\r\t\u0007/\u001b\u0006\u0005\u0003o\u000bI,A\u0004kkBLG/\u001a:\u000b\u0007\u0005m&,A\u0003kk:LG/\u0003\u0003\u0002@\u0006E&!C!gi\u0016\u0014X)Y2i\u0003i\u0019'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1e)y\t)-a3\u0002d\u0006\u001d\u00181^A{\u0003o\u0014\tAa\u0004\u0003 \t%\"q\bB&\u0005/\u0012i\u0006E\u0002Q\u0003\u000fL1!!3B\u0005Q\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\"9\u0011QZ\u000eA\u0002\u0005=\u0017\u0001\u00028b[\u0016\u0004B!!5\u0002`:!\u00111[An!\r\t).S\u0007\u0003\u0003/T1!!7F\u0003\u0019a$o\\8u}%\u0019\u0011Q\\%\u0002\rA\u0013X\rZ3g\u0013\u0011\t\u0019\"!9\u000b\u0007\u0005u\u0017\nC\u0004\u0002fn\u0001\r!a!\u0002\u0013\u0019,Go\u00195fe&#\u0007BBAu7\u0001\u0007Q-\u0001\u0007t_V\u00148-\u001a\"s_.,'\u000fC\u0004\u0002nn\u0001\r!a<\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0011\u0007A\u000b\t0C\u0002\u0002t\u0006\u00131bS1gW\u0006\u001cuN\u001c4jO\")An\u0007a\u0001]\"9\u0011\u0011`\u000eA\u0002\u0005m\u0018A\u0003:fa2L7-Y'heB\u0019\u0001+!@\n\u0007\u0005}\u0018I\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\t\u000f\t\r1\u00041\u0001\u0003\u0006\u00059Q.\u001a;sS\u000e\u001c\b\u0003\u0002B\u0004\u0005\u0017i!A!\u0003\u000b\u0007\t\rQ+\u0003\u0003\u0003\u000e\t%!aB'fiJL7m\u001d\u0005\b\u0005#Y\u0002\u0019\u0001B\n\u0003\u0011!\u0018.\\3\u0011\t\tU!1D\u0007\u0003\u0005/Q1A!\u0007V\u0003\u0015)H/\u001b7t\u0013\u0011\u0011iBa\u0006\u0003\tQKW.\u001a\u0005\b\u0005CY\u0002\u0019\u0001B\u0012\u0003\u0015\tXo\u001c;b!\r\u0001&QE\u0005\u0004\u0005O\t%\u0001\u0004*fa2L7-Y)v_R\f\u0007b\u0002B\u00167\u0001\u0007!QF\u0001\u0011i&,'o\u0015;bi\u00164U\r^2iKJ\u0004R\u0001SA9\u0005_\u0001BA!\r\u0003<5\u0011!1\u0007\u0006\u0005\u0005k\u00119$A\u0004gKR\u001c\u0007.\u001a:\u000b\u0007\te2)\u0001\u0003uS\u0016\u0014\u0018\u0002\u0002B\u001f\u0005g\u0011\u0001\u0003V5feN#\u0018\r^3GKR\u001c\u0007.\u001a:\t\u0013\t\u00053\u0004%AA\u0002\t\r\u0013A\u00077fC\u0012,'/\u00128ea>Lg\u000e\u001e\"m_\u000e\\\u0017N\\4TK:$\u0007#\u0002%\u0002r\t\u0015\u0003c\u0001)\u0003H%\u0019!\u0011J!\u0003\u0019\tcwnY6j]\u001e\u001cVM\u001c3\t\u0013\t53\u0004%AA\u0002\t=\u0013!\u00047pO\u000e{g\u000e^3yi>\u0003H\u000fE\u0003I\u0003c\u0012\t\u0006\u0005\u0003\u0003\u0016\tM\u0013\u0002\u0002B+\u0005/\u0011!\u0002T8h\u0007>tG/\u001a=u\u0011%\u0011If\u0007I\u0001\u0002\u0004\u0011Y&A\tgKR\u001c\u0007NQ1dW>3g-T:PaR\u0004R\u0001SA9\u0003sB\u0011Ba\u0018\u001c!\u0003\u0005\rAa\u0017\u0002)\u0019,Go\u00195CC\u000e\\wJ\u001a4Ng6\u000b\u0007p\u00149u\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\n\u0014'\u0006\u0002\u0003f)\"!1IAH\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\n$'\u0006\u0002\u0003l)\"!qJAH\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\n4'\u0006\u0002\u0003r)\"!1LAH\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\nD'\u0001\u0015tQ>,H\u000eZ*f]\u0012d\u0015\r^3tiJ+\u0017/^3tiZ+'o]5p]N\u0014\u0015\u0010R3gCVdG\u000fK\u0002!\u0005s\u0002B!a,\u0003|%!!QPAY\u0005\u0011!Vm\u001d;\u0002}Q,7\u000f\u001e$fi\u000eDG*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:u\u0013\u001ad\u0015m\u001d;Fa>\u001c\u0007\u000eR3gS:,GMR8s'>lW\rU1si&$\u0018n\u001c8tQ\r\t#\u0011P\u0001\u0016CN\u001cXM\u001d;QCJ$\u0018\u000e^5p]N#\u0018\r^3t))\t)Ka\"\u0003\u0010\ne%Q\u0014\u0005\b\u0005k\u0011\u0003\u0019\u0001BE!\r\u0001&1R\u0005\u0004\u0005\u001b\u000b%!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0005\b\u0005#\u0013\u0003\u0019\u0001BJ\u0003U\u0019\bn\\;mI\n+'+Z1es\u001a{'OR3uG\"\u00042\u0001\u0013BK\u0013\r\u00119*\u0013\u0002\b\u0005>|G.Z1o\u0011\u001d\u0011YJ\ta\u0001\u0005'\u000bQc\u001d5pk2$')\u001a+sk:\u001c\u0017\r^5oO2{w\rC\u0004\u0003 \n\u0002\rAa%\u0002\u001fMDw.\u001e7e\u0005\u0016$U\r\\1zK\u0012\fQe\u001d5pk2$\u0007*\u00198eY\u0016,\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3)\u0007\r\u0012I(A\"tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[(oYfLe\rT3bI\u0016\u0014X\t]8dQ.swn\u001e8U_\n{G\u000f[%caJ2\u0004f\u0001\u0013\u0003z\u0005A4\u000f[8vY\u0012tu\u000e\u001e$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[,ji\"$&/\u001e8dCR,wJ\u001c$fi\u000eD\u0007fA\u0013\u0003z\u0005\u0011c/\u001a:jMf4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"$b!!*\u00032\n}\u0006b\u0002BZM\u0001\u0007!QW\u0001\u0004S\n\u0004\b\u0003\u0002B\\\u0005wk!A!/\u000b\u0007\u0005M6)\u0003\u0003\u0003>\ne&AC!qSZ+'o]5p]\"I!\u0011\u0019\u0014\u0011\u0002\u0003\u0007\u00111Q\u0001\u0010KB|7\r\u001b$fi\u000eD7i\\;oi\u0006ac/\u001a:jMf4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"$C-\u001a4bk2$HEM\u0001\u001bm\u0016\u0014\u0018NZ=PM\u001a\u001cX\r\u001e*fcV,7\u000f\u001e,feNLwN\u001c\u000b\t\u0003K\u0013IMa3\u0003^\"9!1\u0017\u0015A\u0002\tU\u0006b\u0002BgQ\u0001\u0007!qZ\u0001\u001c_\u001a47/\u001a;G_JdU-\u00193fe\u0016\u0003xn\u00195SKF,Xm\u001d;\u0011\t\tE'q\u001b\b\u0005\u0003\u0007\u0012\u0019.\u0003\u0003\u0003V\u0006\u0015\u0013\u0001H(gMN,Go\u001d$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^\u0005\u0005\u00053\u0014YNA\u0004Ck&dG-\u001a:\u000b\t\tU\u0017Q\t\u0005\b\u0005?D\u0003\u0019\u0001Bq\u0003Ia\u0017n\u001d;PM\u001a\u001cX\r^:SKF,Xm\u001d;\u0011\t\t\r(\u0011\u001e\b\u0005\u0003\u0007\u0012)/\u0003\u0003\u0003h\u0006\u0015\u0013A\u0005'jgR|eMZ:fiN\u0014V-];fgRLAA!7\u0003l*!!q]A#\u0003e1XM]5gs6\u000b'o\u001b*fa2L7-\u0019+ie>$H\u000f\\3\u0015\r\u0005\u0015&\u0011\u001fB{\u0011\u001d\u0011\u00190\u000ba\u0001\u0003w\faB]3qY&\u001c\u0017-T1oC\u001e,'\u000fC\u0005\u0003x&\u0002\n\u00111\u0001\u0002\u0004\u0006)A/[7fg\u0006\u0019c/\u001a:jMfl\u0015M]6SKBd\u0017nY1UQJ|G\u000f\u001e7fI\u0011,g-Y;mi\u0012\u0012\u0014!H:i_VdG\r\u00165s_R$H.\u001a$pY2|w/\u001a:SKBd\u0017nY1)\u0007-\u0012I(\u0001\u0011uKN$hi\u001c7m_^,'/S:UQJ|G\u000f\u001e7fI>sGj\\<ESN\\\u0007f\u0001\u0017\u0003z\u0005!4\u000f[8vY\u0012$&/\u001e8dCR,Gk\\(gMN,Go\u00159fG&4\u0017.\u001a3J]\u0016\u0003xn\u00195PM\u001a\u001cX\r\u001e*fgB|gn]3)\u00075\u0012I(A'tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_>3gm]3u'B,7-\u001b4jK\u0012Le.\u00129pG\"|eMZ:fiJ+7\u000f]8og\u0016LeMR8mY><XM\u001d%bg:{Wj\u001c:f\u000bB|7\r[:)\u00079\u0012I(\u0001&tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i'\u0016\u001cwN\u001c3US6,\u0017J\u001a'fC\u0012,'OU3qY&,7oV5uQ\u0016\u0003xn\u00195O_R\\en\\<o)>4u\u000e\u001c7po\u0016\u0014\bfA\u0018\u0003z\u0005\t5\u000f[8vY\u0012$&/\u001e8dCR,\u0017J\u001a'fC\u0012,'OU3qY&,7oV5uQ\u0012Kg/\u001a:hS:<W\t]8dQ:{Go\u00138po:$vNR8mY><XM\u001d\u0015\u0004a\te\u0014!\f;fgR$&/\u001e8dCR,wJ\u001c$fi\u000eDGi\\3t\u001d>$X\u000b\u001d3bi\u0016D\u0015n\u001a5XCR,'/\\1sW\"\u001a\u0011G!\u001f\u0002gMDw.\u001e7e+N,G*Z1eKJ,e\u000eZ(gMN,G/\u00134J]R,'O\u0011:pW\u0016\u0014h+\u001a:tS>t')\u001a7poJ\u0002\u0004f\u0001\u001a\u0003z\u0005\u00015\u000f[8vY\u0012$&/\u001e8dCR,Gk\\%oSRL\u0017\r\u001c$fi\u000eDwJ\u001a4tKRLe\rT3bI\u0016\u0014(+\u001a;ve:\u001cXK\u001c3fM&tW\rZ(gMN,G\u000fK\u00024\u0005s\n\u0011g\u001d5pk2$\u0007k\u001c7m\u0013:$WMZ5oSR,G._%g\u0019\u0016\fG-\u001a:SKR,(O\\:B]f,\u0005pY3qi&|g\u000eK\u00025\u0005s\n1f\u001d5pk2$Wj\u001c<f!\u0006\u0014H/\u001b;j_:\u001cx*\u001e;PMR\u0013XO\\2bi&tw\rT8h'R\fG/\u001a\u0015\u0004k\te\u0014\u0001O:i_VdGMR5mi\u0016\u0014\b+\u0019:uSRLwN\\:NC\u0012,G*Z1eKJ$UO]5oO2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\u0015\u0004m\te\u0014\u0001S:i_VdGmQ1uG\",\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3XQ\u0016t7\u000b[;ui&tw\rR8x]J+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eQ\r9$\u0011P\u0001'g\"|W\u000f\u001c3Va\u0012\fG/\u001a*fCN\u001c\u0018n\u001a8nK:$()\u001f;fg&sW*\u001a;sS\u000e\u001c\bf\u0001\u001d\u0003z\u000515\u000f[8vY\u0012tu\u000e^+qI\u0006$XMU3bgNLwM\\7f]R\u0014\u0015\u0010^3t\u0013:lU\r\u001e:jGN<\u0006.\u001a8O_J+\u0017m]:jO:lWM\u001c;t\u0013:\u0004&o\\4sKN\u001c\bfA\u001d\u0003z\u0005qA/Z:u\u0005VLG\u000e\u001a$fi\u000eD\u0007f\u0001\u001e\u0003z\u0005\tc.Z<PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s!\u0006\u0014H/\u001b;j_:\u0014Vm];miRA1qHB1\u0007K\u001a9\u0007\u0005\u0003\u0004B\rmc\u0002BB\"\u0007/rAa!\u0012\u0004V9!1qIB*\u001d\u0011\u0019Ie!\u0015\u000f\t\r-3q\n\b\u0005\u0003+\u001ci%C\u0001\\\u0013\tI&,\u0003\u0002E1&\u0011akV\u0005\u0004\u0003_)\u0016\u0002BB-\u0003[\t\u0001e\u00144gg\u0016$hi\u001c:MK\u0006$WM]#q_\u000eD'+Z:q_:\u001cX\rR1uC&!1QLB0\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTAa!\u0017\u0002.!111M\u001eA\u0002M\u000b!\u0001\u001e9\t\u000f\u0005\u00055\b1\u0001\u0002\u0004\"91\u0011N\u001eA\u0002\u0005e\u0014!C3oI>3gm]3u))\u0019yd!\u001c\u0004p\r}4\u0011\u0011\u0005\u0007\u0007Gb\u0004\u0019A*\t\u000f\rED\b1\u0001\u0004t\u0005)QM\u001d:peB!1QOB>\u001b\t\u00199HC\u0002\u0004zU\u000b\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0007{\u001a9H\u0001\u0004FeJ|'o\u001d\u0005\b\u0003\u0003c\u0004\u0019AAB\u0011\u001d\u0019I\u0007\u0010a\u0001\u0003s\na$Y:tKJ$\bK]8dKN\u001c\b+\u0019:uSRLwN\u001c#bi\u0006<\u0006.\u001a8\u0015\t\u0005\u00156q\u0011\u0005\b\u0007\u0013k\u0004\u0019\u0001BJ\u00035I7OU3bgNLwM\\5oO\u0006!1\u000f^;c)!\t)ka$\u0004\u001a\u000em\u0005bBBI}\u0001\u000711S\u0001\na\u0006\u0014H/\u001b;j_:\u00042AZBK\u0013\r\u00199j\u001a\u0002\n!\u0006\u0014H/\u001b;j_:DqAa=?\u0001\u0004\tY\u0010C\u0004\u0004\u001ez\u0002\raa(\u0002\u00071|w\r\u0005\u0003\u0004\"\u000e\u0015VBABR\u0015\r\u0019ijQ\u0005\u0005\u0007O\u001b\u0019KA\u0006BEN$(/Y2u\u0019><\u0017\u0001H6bM.\f7i\u001c8gS\u001etu\u000e\u0016:v]\u000e\fG/Z(o\r\u0016$8\r[\u000b\u0003\u0003_\u0004")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId1()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), topicId2())}));
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic2").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$))).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, partitionStates(), Collections.emptyList(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicIds()).asJava()).build();
    private final ZkMetadataCache metadataCache = new ZkMetadataCache(0, ZkMetadataCache$.MODULE$.$lessinit$greater$default$2());

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> option, long j, int i) {
        return new InitialFetchState(option, new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String str, int i, BrokerEndPoint brokerEndPoint, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ReplicaManager replicaManager, Metrics metrics, Time time, ReplicaQuota replicaQuota, Option<TierStateFetcher> option, Option<BlockingSend> option2, Option<LogContext> option3, Option<Object> option4, Option<Object> option5) {
        return new ReplicaFetcherThread(str, i, brokerEndPoint, kafkaConfig, failedPartitions, replicaManager, metrics, time, replicaQuota, option2, option3, ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12(), option4, option5);
    }

    public Option<BlockingSend> createReplicaFetcherThread$default$11() {
        return None$.MODULE$;
    }

    public Option<LogContext> createReplicaFetcherThread$default$12() {
        return None$.MODULE$;
    }

    public Option<Object> createReplicaFetcherThread$default$13() {
        return None$.MODULE$;
    }

    public Option<Object> createReplicaFetcherThread$default$14() {
        return None$.MODULE$;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, None$.MODULE$, createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(), createReplicaFetcherThread.fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), createReplicaFetcherThread.offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), createReplicaFetcherThread.listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(None$.MODULE$);
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        assertPartitionStates(createReplicaFetcherThread, false, true, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        ((Partition) Mockito.verify(partition, Mockito.times(3))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(kafka$server$ReplicaFetcherThreadTest$$t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.sendRequest((AbstractRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), null, None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14()).fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        ((BlockingSend) Mockito.verify(blockingSend)).sendRequest((AbstractRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        verifyFetchLeaderEpochOnFirstFetch(KAFKA_2_6_IV0$.MODULE$, verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(ApiVersion apiVersion, int i) {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), apiVersion.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        if (i > 0) {
            verifyOffsetRequestVersion(apiVersion, (OffsetsForLeaderEpochRequest.Builder) replicaFetcherMockBlockingSend.lastUsedOffsetsForLeaderEpochRequest().get(), createReplicaFetcherThread.listOffsetRequestBuilder((ListOffsetsRequestData.ListOffsetsTopic) EasyMock.createMock(ListOffsetsRequestData.ListOffsetsTopic.class)));
        }
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void verifyOffsetRequestVersion(ApiVersion apiVersion, OffsetsForLeaderEpochRequest.Builder builder, ListOffsetsRequest.Builder builder2) {
        KAFKA_2_6_IV0$ kafka_2_6_iv0$ = KAFKA_2_6_IV0$.MODULE$;
        if (apiVersion != null ? !apiVersion.equals(kafka_2_6_iv0$) : kafka_2_6_iv0$ != null) {
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.latestAllowedVersion());
            Assertions.assertEquals(0, builder2.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builder2.latestAllowedVersion());
            return;
        }
        Assertions.assertEquals(3, builder.oldestAllowedVersion());
        Assertions.assertEquals(3, builder.latestAllowedVersion());
        Assertions.assertEquals(0, builder2.oldestAllowedVersion());
        Assertions.assertEquals(5, builder2.latestAllowedVersion());
    }

    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(i))).markFollowerReplicaThrottle();
    }

    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef lazyRef = new LazyRef();
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), Quota$2(lazyRef), None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertEquals(new Some(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p1()}))), replicaFetcherMockBlockingSend.lastFetchRequest().map(builder -> {
            return (Set) CollectionConverters$.MODULE$.asScalaSetConverter(builder.fetchData().keySet()).asScala();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        verifyMarkReplicaThrottle(replicaManager, 1);
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isThrottled((TopicPartition) EasyMock.anyObject(TopicPartition.class)))).thenReturn(BoxesRunTime.boxToBoolean(true));
        AtomicReference atomicReference = new AtomicReference(new Some(BoxesRunTime.boxToLong(42L)));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(atomicReference, new AtomicReference[]{atomicReference, atomicReference, null});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("audi", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), PartitionFetchState$.MODULE$.apply(new Some(topicId1()), 0L, new Some(BoxesRunTime.boxToLong(0L)), 5, Fetching$.MODULE$, None$.MODULE$, 0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), PartitionFetchState$.MODULE$.apply(new Some(topicId1()), 0L, None$.MODULE$, 5, Fetching$.MODULE$, None$.MODULE$, 0))}));
        createReplicaFetcherThread.buildFetch(apply);
        DiskUsageBasedThrottler$.MODULE$.registerListener(replicationQuotaManager);
        createReplicaFetcherThread.buildFetch(apply);
        DiskUsageBasedThrottler$.MODULE$.deRegisterListener(replicationQuotaManager);
        createReplicaFetcherThread.buildFetch(apply);
        verifyMarkReplicaThrottle(replicaManager, 2);
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 3));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(None$.MODULE$);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(200)), new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.fetchCount());
        replicaFetcherMockBlockingSend.setOffsetsForNextResponse((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 3, 102L))}))).asJava());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion() >= 3, "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(102)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(101)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        final KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(115L));
        Mockito.when(abstractLog.latestEpoch()).thenAnswer(invocationOnMock -> {
            return (Option) create.elem;
        });
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(129L, 2)));
        Mockito.when(abstractLog.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(119L, 1)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        final ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, fromProps, replicaManager, replicationQuotaManager, replicaFetcherMockBlockingSend) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }

            {
                BrokerEndPoint brokerEndPoint = this.brokerEndPoint();
                FailedPartitions failedPartitions = this.failedPartitions();
                Metrics metrics = new Metrics();
                SystemTime systemTime = new SystemTime();
                Some some = new Some(replicaFetcherMockBlockingSend);
                Option $lessinit$greater$default$11 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$11();
                Map $lessinit$greater$default$12 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12();
                Option $lessinit$greater$default$13 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$13();
                Option $lessinit$greater$default$14 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$14();
            }
        };
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3()))})));
        scala.collection.immutable.Set apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        apply.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        replicaFetcherMockBlockingSend.setIdsForNextResponse(topicIds());
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(140)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(141)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        replicaFetcherMockBlockingSend.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(129)), new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        replicaFetcherMockBlockingSend.setIdsForNextResponse(topicIds());
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(4, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(6))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(119)), new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(130));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(150));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(None$.MODULE$);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(replicaFetcherMockBlockingSend), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$11(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$13(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$14());
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 150, initialFetchState$default$3()))})));
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(160L).setDivergingEpoch(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)))})));
        replicaFetcherMockBlockingSend.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(1))).truncateTo(140L, false);
        ((AbstractLog) Mockito.verify(abstractLog, Mockito.times(0))).updateHighWatermark(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), -1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion(), "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(155)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(143)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 100, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(300, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(300));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        java.util.Map map = (java.util.Map) CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint(), new SystemTime())), None$.MODULE$, new Some(BoxesRunTime.boxToLong(100L)), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        int[] iArr = {100, 200, 400, 800};
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            createReplicaFetcherThread.doWork();
            Assertions.assertEquals(iArr[i], ((DelayedItem) ((PartitionFetchState) createReplicaFetcherThread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).get()).delay().get()).delayMs());
        });
        ((Partition) Mockito.verify(partition, Mockito.never())).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        map.put(kafka$server$ReplicaFetcherThreadTest$$t1p0(), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(156L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(4)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(0L, 4)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState -> {
            return partitionFetchState.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState2 -> {
            return partitionFetchState2.state();
        }));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState3 -> {
            return partitionFetchState3.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState4 -> {
            return partitionFetchState4.state();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(0L, false);
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(100, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = kafka$server$ReplicaFetcherThreadTest$$t1p0();
        replicaFetcherMockBlockingSend.setEpochRequestCallback(() -> {
            createReplicaFetcherThread.removePartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0})));
        });
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(49L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        blockingSend.initiateClose();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        blockingSend.close();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), null, None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        ((BlockingSend) Mockito.verify(blockingSend)).initiateClose();
        ((BlockingSend) Mockito.verify(blockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    @Test
    public void testBuildFetch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId1(), kafka$server$ReplicaFetcherThreadTest$$t1p0());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicId1(), t1p1());
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(topicId2(), t2p1());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        ReplicaQuota replicaQuota = (ReplicaQuota) Mockito.mock(ReplicaQuota.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaQuota.isThrottled((TopicPartition) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicaQuota, new Some(blockingSend), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$11(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$13(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$14());
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new PartitionFetchState(new Some(topicId1()), 150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(topicId2()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaFetcherThread.buildFetch(apply);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map) apply.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple2._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom())).asJava(), fetchRequest.fetchData());
        Assertions.assertEquals(0, fetchRequest.replaced().size());
        Assertions.assertEquals(0, fetchRequest.removed().size());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition2, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition3, new FetchResponseData.PartitionData());
        replicaFetcherThread.fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 123, linkedHashMap), ApiKeys.FETCH.latestVersion());
        Map apply2 = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(Uuid.randomUuid()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaFetcherThread.buildFetch(apply2);
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map map = (Map) ((TraversableLike) apply2.drop(1)).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom());
        Assertions.assertTrue(option2.isDefined());
        FetchRequest.Builder fetchRequest2 = ((AbstractFetcherThread.ReplicaFetch) option2.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), fetchRequest2.fetchData());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), fetchRequest2.replaced());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition), fetchRequest2.removed());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isReassigning())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(None$.MODULE$);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12(), createReplicaFetcherThread$default$13(), createReplicaFetcherThread$default$14()).processPartitionData(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})));
        if (z) {
            Assertions.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog abstractLog) {
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t2p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t2p1())).thenReturn(partition);
        Mockito.when(partition.getLinkedLeaderEpoch()).thenReturn(None$.MODULE$);
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef lazyRef) {
        ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$;
        synchronized (lazyRef) {
            replicaFetcherThreadTest$Quota$1$ = lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : (ReplicaFetcherThreadTest$Quota$1$) lazyRef.initialize(new ReplicaQuota(this) { // from class: kafka.server.ReplicaFetcherThreadTest$Quota$1$
                private final /* synthetic */ ReplicaFetcherThreadTest $outer;

                public boolean isThrottled(TopicPartition topicPartition) {
                    TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = this.$outer.kafka$server$ReplicaFetcherThreadTest$$t1p0();
                    return topicPartition == null ? kafka$server$ReplicaFetcherThreadTest$$t1p0 == null : topicPartition.equals(kafka$server$ReplicaFetcherThreadTest$$t1p0);
                }

                public boolean isQuotaExceeded() {
                    return true;
                }

                public void record(long j) {
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        }
        return replicaFetcherThreadTest$Quota$1$;
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : Quota$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int i, FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponseData.PartitionData().setPartitionIndex(i).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(epochEndOffset);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
