package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\tMg\u0001\u0002\u00192\u0001YBQ!\u0010\u0001\u0005\u0002yBq!\u0011\u0001C\u0002\u0013%!\t\u0003\u0004O\u0001\u0001\u0006Ia\u0011\u0005\b\u001f\u0002\u0011\r\u0011\"\u0003C\u0011\u0019\u0001\u0006\u0001)A\u0005\u0007\"9\u0011\u000b\u0001b\u0001\n\u0013\u0011\u0005B\u0002*\u0001A\u0003%1\tC\u0004T\u0001\t\u0007I\u0011\u0003+\t\rm\u0003\u0001\u0015!\u0003V\u0011\u001da\u0006A1A\u0005\u0012uCa!\u0019\u0001!\u0002\u0013q\u0006\"\u00022\u0001\t\u0013\u0019\u0007bB9\u0001#\u0003%IA\u001d\u0005\u0006{\u0002!\tA \u0005\b\u00037\u0001A\u0011CA\u000f\u0011%\t9\fAI\u0001\n#\tI\fC\u0005\u0002>\u0002\t\n\u0011\"\u0005\u0002@\"1\u00111\u0019\u0001\u0005\u0002yDa!!4\u0001\t\u0003q\bbBAi\u0001\u0011\u0005\u00111\u001b\u0005\u0007\u0003_\u0004A\u0011\u0001@\t\r\u0005M\b\u0001\"\u0001\u007f\u0011\u0019\t9\u0010\u0001C\u0001}\"9\u00111 \u0001\u0005\u0012\u0005u\b\u0002\u0003B\t\u0001E\u0005I\u0011\u0003:\t\u000f\tM\u0001\u0001\"\u0001\u0003\u0016!A!q\u0004\u0001\u0012\u0002\u0013\u0005!\u000f\u0003\u0004\u0003\"\u0001!\tA \u0005\u0007\u0005K\u0001A\u0011\u0001@\t\r\t%\u0002\u0001\"\u0001\u007f\u0011\u0019\u0011i\u0003\u0001C\u0001}\"1!\u0011\u0007\u0001\u0005\u0002yDaA!\u000e\u0001\t\u0003q\bB\u0002B\u001d\u0001\u0011\u0005a\u0010\u0003\u0004\u0003>\u0001!\tA \u0005\u0007\u0005\u0003\u0002A\u0011\u0001@\t\r\t\u0015\u0003\u0001\"\u0001\u007f\u0011\u0019\u0011I\u0005\u0001C\u0001}\"1!Q\n\u0001\u0005\u0002yDaA!\u0015\u0001\t\u0003q\bB\u0002B+\u0001\u0011\u0005a\u0010\u0003\u0004\u0003Z\u0001!\tA \u0005\b\u0005;\u0002A\u0011\u0002B0\u0011\u001d\u0011i\u0006\u0001C\u0005\u0005#CqA!+\u0001\t\u0013\u0011Y\u000bC\u0004\u00032\u0002!\tAa-\t\u000f\t=\u0007\u0001\"\u0003\u0003R\nA\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$Vm\u001d;\u000b\u0005I\u001a\u0014AB:feZ,'OC\u00015\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u001c\u0011\u0005aZT\"A\u001d\u000b\u0003i\nQa]2bY\u0006L!\u0001P\u001d\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\tq\b\u0005\u0002A\u00015\t\u0011'\u0001\u0003ucA\u0004T#A\"\u0011\u0005\u0011cU\"A#\u000b\u0005\u0019;\u0015AB2p[6|gN\u0003\u00025\u0011*\u0011\u0011JS\u0001\u0007CB\f7\r[3\u000b\u0003-\u000b1a\u001c:h\u0013\tiUI\u0001\bU_BL7\rU1si&$\u0018n\u001c8\u0002\u000bQ\f\u0004\u000f\r\u0011\u0002\tQ\f\u0004/M\u0001\u0006iF\u0002\u0018\u0007I\u0001\u0005iJ\u0002\u0018'A\u0003ueA\f\u0004%\u0001\bce>\\WM]#oIB{\u0017N\u001c;\u0016\u0003U\u0003\"AV-\u000e\u0003]S!\u0001W\u001a\u0002\u000f\rdWo\u001d;fe&\u0011!l\u0016\u0002\u000f\u0005J|7.\u001a:F]\u0012\u0004v.\u001b8u\u0003=\u0011'o\\6fe\u0016sG\rU8j]R\u0004\u0013\u0001\u00054bS2,G\rU1si&$\u0018n\u001c8t+\u0005q\u0006C\u0001!`\u0013\t\u0001\u0017G\u0001\tGC&dW\r\u001a)beRLG/[8og\u0006\tb-Y5mK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u0011\u0002#%t\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\rF\u0002eO2\u0004\"\u0001Q3\n\u0005\u0019\f$!E%oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\")\u0001\u000e\u0004a\u0001S\u0006Ya-\u001a;dQ>3gm]3u!\tA$.\u0003\u0002ls\t!Aj\u001c8h\u0011\u001diG\u0002%AA\u00029\f1\u0002\\3bI\u0016\u0014X\t]8dQB\u0011\u0001h\\\u0005\u0003af\u00121!\u00138u\u0003mIg.\u001b;jC24U\r^2i'R\fG/\u001a\u0013eK\u001a\fW\u000f\u001c;%eU\t1O\u000b\u0002oi.\nQ\u000f\u0005\u0002ww6\tqO\u0003\u0002ys\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0003uf\n!\"\u00198o_R\fG/[8o\u0013\taxOA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fqa\u00197fC:,\b\u000fF\u0001��!\rA\u0014\u0011A\u0005\u0004\u0003\u0007I$\u0001B+oSRD3ADA\u0004!\u0011\tI!a\u0006\u000e\u0005\u0005-!\u0002BA\u0007\u0003\u001f\t1!\u00199j\u0015\u0011\t\t\"a\u0005\u0002\u000f),\b/\u001b;fe*\u0019\u0011Q\u0003&\u0002\u000b),h.\u001b;\n\t\u0005e\u00111\u0002\u0002\n\u0003\u001a$XM]#bG\"\f!d\u0019:fCR,'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$\"$a\b\u0002&\u0005}\u00121IA$\u0003#\n\u0019&!\u0018\u0002l\u0005m\u0014QQAP\u0003W\u00032\u0001QA\u0011\u0013\r\t\u0019#\r\u0002\u0015%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193\t\u000f\u0005\u001dr\u00021\u0001\u0002*\u0005!a.Y7f!\u0011\tY#!\u000f\u000f\t\u00055\u0012Q\u0007\t\u0004\u0003_ITBAA\u0019\u0015\r\t\u0019$N\u0001\u0007yI|w\u000e\u001e \n\u0007\u0005]\u0012(\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003w\tiD\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003oI\u0004BBA!\u001f\u0001\u0007a.A\u0005gKR\u001c\u0007.\u001a:JI\"1\u0011QI\bA\u0002U\u000bAb]8ve\u000e,'I]8lKJDq!!\u0013\u0010\u0001\u0004\tY%\u0001\u0007ce>\\WM]\"p]\u001aLw\rE\u0002A\u0003\u001bJ1!a\u00142\u0005-Y\u0015MZ6b\u0007>tg-[4\t\u000bq{\u0001\u0019\u00010\t\u000f\u0005Us\u00021\u0001\u0002X\u0005Q!/\u001a9mS\u000e\fWj\u001a:\u0011\u0007\u0001\u000bI&C\u0002\u0002\\E\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'\u000fC\u0004\u0002`=\u0001\r!!\u0019\u0002\u000f5,GO]5dgB!\u00111MA4\u001b\t\t)GC\u0002\u0002`\u0015KA!!\u001b\u0002f\t9Q*\u001a;sS\u000e\u001c\bbBA7\u001f\u0001\u0007\u0011qN\u0001\u0005i&lW\r\u0005\u0003\u0002r\u0005]TBAA:\u0015\r\t)(R\u0001\u0006kRLGn]\u0005\u0005\u0003s\n\u0019H\u0001\u0003US6,\u0007bBA?\u001f\u0001\u0007\u0011qP\u0001\u0006cV|G/\u0019\t\u0004\u0001\u0006\u0005\u0015bAABc\ta!+\u001a9mS\u000e\f\u0017+^8uC\"9\u0011qQ\bA\u0002\u0005%\u0015\u0001\u0005;jKJ\u001cF/\u0019;f\r\u0016$8\r[3s!\u0015A\u00141RAH\u0013\r\ti)\u000f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005E\u00151T\u0007\u0003\u0003'SA!!&\u0002\u0018\u00069a-\u001a;dQ\u0016\u0014(bAAMg\u0005!A/[3s\u0013\u0011\ti*a%\u0003!QKWM]*uCR,g)\u001a;dQ\u0016\u0014\b\"CAQ\u001fA\u0005\t\u0019AAR\u0003iaW-\u00193fe\u0016sG\r]8j]R\u0014En\\2lS:<7+\u001a8e!\u0015A\u00141RAS!\r\u0001\u0015qU\u0005\u0004\u0003S\u000b$\u0001\u0004\"m_\u000e\\\u0017N\\4TK:$\u0007\"CAW\u001fA\u0005\t\u0019AAX\u00035awnZ\"p]R,\u0007\u0010^(qiB)\u0001(a#\u00022B!\u0011\u0011OAZ\u0013\u0011\t),a\u001d\u0003\u00151{wmQ8oi\u0016DH/A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132cU\u0011\u00111\u0018\u0016\u0004\u0003G#\u0018!J2sK\u0006$XMU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G\u0005Z3gCVdG\u000fJ\u00193+\t\t\tMK\u0002\u00020R\f\u0001f\u001d5pk2$7+\u001a8e\u0019\u0006$Xm\u001d;SKF,Xm\u001d;WKJ\u001c\u0018n\u001c8t\u0005f$UMZ1vYRD3AEAd!\u0011\tI!!3\n\t\u0005-\u00171\u0002\u0002\u0005)\u0016\u001cH/\u0001 uKN$h)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^%g\u0019\u0006\u001cH/\u00129pG\"$UMZ5oK\u00124uN]*p[\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004'\u0005\u001d\u0017!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u000b\n\u007f\u0006U\u0017Q\\At\u0003WDq!!&\u0015\u0001\u0004\t9\u000eE\u0002A\u00033L1!a72\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012Dq!a8\u0015\u0001\u0004\t\t/A\u000btQ>,H\u000e\u001a\"f%\u0016\fG-\u001f$pe\u001a+Go\u00195\u0011\u0007a\n\u0019/C\u0002\u0002ff\u0012qAQ8pY\u0016\fg\u000eC\u0004\u0002jR\u0001\r!!9\u0002+MDw.\u001e7e\u0005\u0016$&/\u001e8dCRLgn\u001a'pO\"9\u0011Q\u001e\u000bA\u0002\u0005\u0005\u0018aD:i_VdGMQ3EK2\f\u00170\u001a3\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007fA\u000b\u0002H\u0006\u00195\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"|e\u000e\\=JM2+\u0017\rZ3s\u000bB|7\r[&o_^tGk\u001c\"pi\"L%\r\u001d\u001a7Q\r1\u0012qY\u00019g\"|W\u000f\u001c3O_R4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"<\u0016\u000e\u001e5UeVt7-\u0019;f\u001f:4U\r^2iQ\r9\u0012qY\u0001#m\u0016\u0014\u0018NZ=GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195\u0015\u000b}\fyP!\u0004\t\u000f\t\u0005\u0001\u00041\u0001\u0003\u0004\u0005\u0019\u0011N\u00199\u0011\t\t\u0015!\u0011B\u0007\u0003\u0005\u000fQ1!!\u00044\u0013\u0011\u0011YAa\u0002\u0003\u0015\u0005\u0003\u0018NV3sg&|g\u000e\u0003\u0005\u0003\u0010a\u0001\n\u00111\u0001o\u0003=)\u0007o\\2i\r\u0016$8\r[\"pk:$\u0018\u0001\f<fe&4\u0017PR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007\u000e\n3fM\u0006,H\u000e\u001e\u00133\u0003e)\u0007\u0010]3di6\u000b'o\u001b*fa2L7-\u0019+ie>$H\u000f\\3\u0015\u000b}\u00149Ba\u0007\t\u000f\te!\u00041\u0001\u0002X\u0005q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\b\u0002\u0003B\u000f5A\u0005\t\u0019\u00018\u0002\u000bQLW.Z:\u0002G\u0015D\b/Z2u\u001b\u0006\u00148NU3qY&\u001c\u0017\r\u00165s_R$H.\u001a\u0013eK\u001a\fW\u000f\u001c;%e\u0005i2\u000f[8vY\u0012$\u0006N]8ui2,gi\u001c7m_^,'OU3qY&\u001c\u0017\rK\u0002\u001d\u0003\u000f\f\u0001\u0005^3ti\u001a{G\u000e\\8xKJL5\u000f\u00165s_R$H.\u001a3P]2{w\u000fR5tW\"\u001aQ$a2\u0002iMDw.\u001e7e)J,hnY1uKR{wJ\u001a4tKR\u001c\u0006/Z2jM&,G-\u00138Fa>\u001c\u0007n\u00144gg\u0016$(+Z:q_:\u001cX\rK\u0002\u001f\u0003\u000f\fQj\u001d5pk2$GK];oG\u0006$X\rV8PM\u001a\u001cX\r^*qK\u000eLg-[3e\u0013:,\u0005o\\2i\u001f\u001a47/\u001a;SKN\u0004xN\\:f\u0013\u001a4u\u000e\u001c7po\u0016\u0014\b*Y:O_6{'/Z#q_\u000eD7\u000fK\u0002 \u0003\u000f\f!j\u001d5pk2$g)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r[*fG>tG\rV5nK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5Fa>\u001c\u0007NT8u\u0017:|wO\u001c+p\r>dGn\\<fe\"\u001a\u0001%a2\u0002\u0003NDw.\u001e7e)J,hnY1uK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5ESZ,'oZ5oO\u0016\u0003xn\u00195O_R\\en\\<o)>4u\u000e\u001c7po\u0016\u0014\bfA\u0011\u0002H\u0006iC/Z:u)J,hnY1uK>sg)\u001a;dQ\u0012{Wm\u001d(piV\u0003H-\u0019;f\u0011&<\u0007nV1uKJl\u0017M]6)\u0007\t\n9-A\u001atQ>,H\u000eZ+tK2+\u0017\rZ3s\u000b:$wJ\u001a4tKRLe-\u00138uKJ\u0014%o\\6feZ+'o]5p]\n+Gn\\<3a!\u001a1%a2\u0002\u0001NDw.\u001e7e)J,hnY1uKR{\u0017J\\5uS\u0006dg)\u001a;dQ>3gm]3u\u0013\u001adU-\u00193feJ+G/\u001e:ogVsG-\u001a4j]\u0016$wJ\u001a4tKRD3\u0001JAd\u0003E\u001a\bn\\;mIB{G\u000e\\%oI\u00164\u0017N\\5uK2L\u0018J\u001a'fC\u0012,'OU3ukJt7/\u00118z\u000bb\u001cW\r\u001d;j_:D3!JAd\u0003-\u001a\bn\\;mI6{g/\u001a)beRLG/[8og>+Ho\u00144UeVt7-\u0019;j]\u001edunZ*uCR,\u0007f\u0001\u0014\u0002H\u0006A4\u000f[8vY\u00124\u0015\u000e\u001c;feB\u000b'\u000f^5uS>t7/T1eK2+\u0017\rZ3s\tV\u0014\u0018N\\4MK\u0006$WM]#q_\u000eD'+Z9vKN$\bfA\u0014\u0002H\u0006A5\u000f[8vY\u0012\u001c\u0015\r^2i\u000bb\u001cW\r\u001d;j_:4%o\\7CY>\u001c7.\u001b8h'\u0016tGm\u00165f]NCW\u000f\u001e;j]\u001e$un\u001e8SKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0015\u0004Q\u0005\u001d\u0017AJ:i_VdG-\u00169eCR,'+Z1tg&<g.\\3oi\nKH/Z:J]6+GO]5dg\"\u001a\u0011&a2\u0002\rNDw.\u001e7e\u001d>$X\u000b\u001d3bi\u0016\u0014V-Y:tS\u001etW.\u001a8u\u0005f$Xm]%o\u001b\u0016$(/[2t/\",gNT8SK\u0006\u001c8/[4o[\u0016tGo]%o!J|wM]3tg\"\u001a!&a2\u0002C9,wo\u00144gg\u0016$hi\u001c:MK\u0006$WM\u001d)beRLG/[8o%\u0016\u001cX\u000f\u001c;\u0015\u0011\t\u0005$q\u0011BF\u0005\u001b\u0003BAa\u0019\u0003\u0002:!!Q\rB>\u001d\u0011\u00119Ga\u001e\u000f\t\t%$Q\u000f\b\u0005\u0005W\u0012\u0019H\u0004\u0003\u0003n\tEd\u0002BA\u0018\u0005_J\u0011aS\u0005\u0003\u0013*K!\u0001\u000e%\n\u0005\u0019;\u0015b\u0001B=\u000b\u00069Q.Z:tC\u001e,\u0017\u0002\u0002B?\u0005\u007f\n\u0001e\u00144gg\u0016$hi\u001c:MK\u0006$WM]#q_\u000eD'+Z:q_:\u001cX\rR1uC*\u0019!\u0011P#\n\t\t\r%Q\u0011\u0002\u000f\u000bB|7\r[#oI>3gm]3u\u0015\u0011\u0011iHa \t\r\t%5\u00061\u0001D\u0003\t!\b\u000fC\u0003nW\u0001\u0007a\u000e\u0003\u0004\u0003\u0010.\u0002\r![\u0001\nK:$wJ\u001a4tKR$\"B!\u0019\u0003\u0014\nU%Q\u0015BT\u0011\u0019\u0011I\t\fa\u0001\u0007\"9!q\u0013\u0017A\u0002\te\u0015!B3se>\u0014\b\u0003\u0002BN\u0005Ck!A!(\u000b\u0007\t}U)\u0001\u0005qe>$xnY8m\u0013\u0011\u0011\u0019K!(\u0003\r\u0015\u0013(o\u001c:t\u0011\u0015iG\u00061\u0001o\u0011\u0019\u0011y\t\fa\u0001S\u0006q\u0012m]:feR\u0004&o\\2fgN\u0004\u0016M\u001d;ji&|g\u000eR1uC^CWM\u001c\u000b\u0004\u007f\n5\u0006b\u0002BX[\u0001\u0007\u0011\u0011]\u0001\u000eSN\u0014V-Y:tS\u001et\u0017N\\4\u0002\tM$XO\u0019\u000b\b\u007f\nU&q\u0018Ba\u0011\u001d\u00119L\fa\u0001\u0005s\u000b\u0011\u0002]1si&$\u0018n\u001c8\u0011\u0007Y\u0013Y,C\u0002\u0003>^\u0013\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\tea\u00061\u0001\u0002X!9!1\u0019\u0018A\u0002\t\u0015\u0017a\u00017pOB!!q\u0019Bf\u001b\t\u0011IMC\u0002\u0003DNJAA!4\u0003J\nY\u0011IY:ue\u0006\u001cG\u000fT8h\u0003qY\u0017MZ6b\u0007>tg-[4O_R\u0013XO\\2bi\u0016|eNR3uG\",\"!a\u0013")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private InitialFetchState initialFetchState(long j, int i) {
        return new InitialFetchState(new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String str, int i, BrokerEndPoint brokerEndPoint, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ReplicaManager replicaManager, Metrics metrics, Time time, ReplicaQuota replicaQuota, Option<TierStateFetcher> option, Option<BlockingSend> option2, Option<LogContext> option3) {
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        return new ReplicaFetcherThread(str, i, brokerEndPoint, kafkaConfig, failedPartitions, replicaManager, metrics, time, replicaQuota, option2, option3, (Map) Map$.MODULE$.empty());
    }

    public Option<BlockingSend> createReplicaFetcherThread$default$11() {
        return None$.MODULE$;
    }

    public Option<LogContext> createReplicaFetcherThread$default$12() {
        return None$.MODULE$;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{replicaManager});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, None$.MODULE$, createReplicaFetcherThread$default$12());
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(), createReplicaFetcherThread.fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), createReplicaFetcherThread.offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), createReplicaFetcherThread.listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(None$.MODULE$).once();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(3);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(0L, 1))})));
        assertPartitionStates(createReplicaFetcherThread, false, true, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        EasyMock.verify(new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(kafka$server$ReplicaFetcherThreadTest$$t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        EasyMock.expect(blockingSend.sendRequest((AbstractRequest.Builder) EasyMock.anyObject())).andThrow(new NullPointerException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        Assertions.assertEquals((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), null, None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12()).fetchEpochEndOffsets((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        verifyFetchLeaderEpochOnFirstFetch(KAFKA_2_6_IV0$.MODULE$, verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(ApiVersion apiVersion, int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), apiVersion.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.replay(new Object[]{replicaManager, logManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        EasyMock.verify(new Object[]{logManager});
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void expectMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        replicaManager.markFollowerReplicaThrottle();
        EasyMock.expect(BoxedUnit.UNIT).times(i);
    }

    public int expectMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef lazyRef = new LazyRef();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        expectMarkReplicaThrottle(replicaManager, 1);
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.replay(new Object[]{replicaManager, logManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), Quota$2(lazyRef), None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertEquals(new Some(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{t1p1()}))), replicaFetcherMockBlockingSend.lastFetchRequest().map(builder -> {
            return CollectionConverters$.MODULE$.SetHasAsScala(builder.fetchData().keySet()).asScala();
        }));
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        replicaManager.markFollowerReplicaThrottle();
        EasyMock.expect(BoxedUnit.UNIT).andVoid().times(4);
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createMock(ReplicationQuotaManager.class);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).andReturn(BoxesRunTime.boxToBoolean(true)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicationQuotaManager.isThrottled((TopicPartition) EasyMock.anyObject(TopicPartition.class)))).andReturn(BoxesRunTime.boxToBoolean(true)).anyTimes();
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(new Some(BoxesRunTime.boxToLong(42L)))).times(3);
        EasyMock.replay(new Object[]{replicaManager, logManager, partition, abstractLog, replicationQuotaManager});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("audi", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        Map$ map$ = Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0());
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Some some = new Some(BoxesRunTime.boxToLong(0L));
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$5 = None$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        Map map = (Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(0L, some, 5, None$.MODULE$, fetching$, none$5)), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, new PartitionFetchState(0L, None$.MODULE$, 5, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))}));
        createReplicaFetcherThread.buildFetch(map);
        DiskThrottleListenerManager.registerListener$(DiskUsageBasedThrottler$.MODULE$, replicationQuotaManager);
        createReplicaFetcherThread.buildFetch(map);
        DiskThrottleListenerManager.deRegisterListener$(DiskUsageBasedThrottler$.MODULE$, replicationQuotaManager);
        createReplicaFetcherThread.buildFetch(map);
        EasyMock.verify(new Object[]{replicationQuotaManager, replicaManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 1)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(200, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 3)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(None$.MODULE$).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(200)), new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.fetchCount());
        replicaFetcherMockBlockingSend.setOffsetsForNextResponse(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 3, 102L))}))).asJava());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion() >= 3, "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(102)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(101)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        final KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(115L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andAnswer(() -> {
            return (Option) create.elem;
        }).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(149L, 4))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(129L, 2))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(2)).andReturn(new Some(new OffsetAndEpoch(119L, 1))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        final ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, fromProps, replicaManager, replicationQuotaManager, replicaFetcherMockBlockingSend) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponse.PartitionData<Records> partitionData) {
                return None$.MODULE$;
            }

            {
                BrokerEndPoint brokerEndPoint = this.brokerEndPoint();
                FailedPartitions failedPartitions = this.failedPartitions();
                Metrics metrics = new Metrics();
                SystemTime systemTime = new SystemTime();
                Some some = new Some(replicaFetcherMockBlockingSend);
                ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
                None$ none$5 = None$.MODULE$;
                ReplicaFetcherThread$ replicaFetcherThread$2 = ReplicaFetcherThread$.MODULE$;
                Map map = (Map) Map$.MODULE$.empty();
            }
        };
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(200, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(200, 1))})));
        Set set = (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        set.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(140)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(141)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        set.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(3, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(129)), new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        set.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(4, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(119)), new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        set.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.mock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(130)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(149L, 4))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(150)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(new BrokerTopicStats()).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn(partition).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) EasyMock.anyObject(), BoxesRunTime.unboxToBoolean(EasyMock.anyObject()))).andReturn(None$.MODULE$).anyTimes();
        partition.truncateTo(140L, false);
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicationQuotaManager.isThrottled((TopicPartition) EasyMock.anyObject()))).andReturn(BoxesRunTime.boxToBoolean(false)).anyTimes();
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).andReturn(BoxesRunTime.boxToBoolean(false)).anyTimes();
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        BrokerEndPoint brokerEndPoint = brokerEndPoint();
        FailedPartitions failedPartitions = failedPartitions();
        Metrics metrics = new Metrics();
        SystemTime systemTime = new SystemTime();
        Some some = new Some(replicaFetcherMockBlockingSend);
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        None$ none$5 = None$.MODULE$;
        ReplicaFetcherThread$ replicaFetcherThread$2 = ReplicaFetcherThread$.MODULE$;
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", 0, brokerEndPoint, fromProps, failedPartitions, replicaManager, metrics, systemTime, replicationQuotaManager, some, none$5, (Map) Map$.MODULE$.empty());
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(150, 1))})));
        replicaFetcherMockBlockingSend.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponse.PartitionData(Errors.NONE, 160L, 0L, 0L, Optional.empty(), Collections.emptyList(), Optional.of(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)), MemoryRecords.EMPTY))})));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        EasyMock.verify(new Object[]{partition, abstractLog});
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), -1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertEquals(0, replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion(), "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, replicaFetcherMockBlockingSend.epochFetchCount());
        Assertions.assertEquals(2, replicaFetcherMockBlockingSend.fetchCount());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(155)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(newCapture.getValues()).asScala().contains(BoxesRunTime.boxToInteger(143)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(newCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).times(2);
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(100, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(300, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(300)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        java.util.Map asJava = CollectionConverters$.MODULE$.MutableMapHasAsJava((scala.collection.mutable.Map) scala.collection.mutable.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(asJava, brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            createReplicaFetcherThread.doWork();
        });
        Assertions.assertEquals(0, newCapture.getValues().size());
        asJava.put(kafka$server$ReplicaFetcherThreadTest$$t1p0(), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(156L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(0L, false);
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(4))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(0L, 4))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime())), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState -> {
            return partitionFetchState.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState2 -> {
            return partitionFetchState2.state();
        }));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState3 -> {
            return partitionFetchState3.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState4 -> {
            return partitionFetchState4.state();
        }));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(100, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicationQuotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference(None$.MODULE$)).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = kafka$server$ReplicaFetcherThreadTest$$t1p0();
        replicaFetcherMockBlockingSend.setEpochRequestCallback(() -> {
            createReplicaFetcherThread.removePartitions((scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0})));
        });
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(49L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        blockingSend.initiateClose();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalArgumentException()).once();
        blockingSend.close();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalStateException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), null, None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12());
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createNiceMock(BlockingSend.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog);
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isReassigning())).andReturn(BoxesRunTime.boxToBoolean(z));
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).andReturn(BoxesRunTime.boxToBoolean(z));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.getPartitionOrException((TopicPartition) EasyMock.anyObject())).andReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(brokerTopicStats).anyTimes();
        ReplicaQuota replicaQuota = (ReplicaQuota) EasyMock.createNiceMock(ReplicaQuota.class);
        EasyMock.replay(new Object[]{blockingSend, replicaManager, partition, abstractLog, replicaQuota});
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicaQuota, None$.MODULE$, new Some(blockingSend), createReplicaFetcherThread$default$12()).processPartitionData(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})));
        if (z) {
            Assertions.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog abstractLog) {
        EasyMock.expect(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn(partition).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p1())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andReturn(partition).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t2p1())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t2p1())).andReturn(partition).anyTimes();
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef lazyRef) {
        ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$;
        synchronized (lazyRef) {
            replicaFetcherThreadTest$Quota$1$ = lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : (ReplicaFetcherThreadTest$Quota$1$) lazyRef.initialize(new ReplicaQuota(this) { // from class: kafka.server.ReplicaFetcherThreadTest$Quota$1$
                private final /* synthetic */ ReplicaFetcherThreadTest $outer;

                public boolean isThrottled(TopicPartition topicPartition) {
                    TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = this.$outer.kafka$server$ReplicaFetcherThreadTest$$t1p0();
                    return topicPartition == null ? kafka$server$ReplicaFetcherThreadTest$$t1p0 == null : topicPartition.equals(kafka$server$ReplicaFetcherThreadTest$$t1p0);
                }

                public boolean isQuotaExceeded() {
                    return true;
                }

                public void record(long j) {
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        }
        return replicaFetcherThreadTest$Quota$1$;
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : Quota$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponse.PartitionData partitionData$1(FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), Optional.of(epochEndOffset), MemoryRecords.EMPTY);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }
}
