package kafka.server;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.ReplicaAlterLogDirsThread;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.FetchPartitionStats;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.Function1;
import scala.Int$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqFactory;
import scala.collection.SeqFactory$UnapplySeqWrapper$;
import scala.collection.SeqOps;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.Numeric$IntIsIntegral$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicaAlterLogDirsThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u001df\u0001B\u0017/\u0001MBQA\u000f\u0001\u0005\u0002mBqA\u0010\u0001C\u0002\u0013%q\b\u0003\u0004L\u0001\u0001\u0006I\u0001\u0011\u0005\b\u0019\u0002\u0011\r\u0011\"\u0003@\u0011\u0019i\u0005\u0001)A\u0005\u0001\"9a\n\u0001b\u0001\n\u0013y\u0005BB*\u0001A\u0003%\u0001\u000bC\u0004U\u0001\t\u0007I\u0011B+\t\r\u0019\u0004\u0001\u0015!\u0003W\u0011\u001d9\u0007A1A\u0005\n!DaA\u001b\u0001!\u0002\u0013I\u0007bB6\u0001\u0005\u0004%I\u0001\u001c\u0005\u0007a\u0002\u0001\u000b\u0011B7\t\u000fE\u0004!\u0019!C\u0005e\"1a\u000f\u0001Q\u0001\nMDqa\u001e\u0001C\u0002\u0013%\u0001\u0010\u0003\u0004}\u0001\u0001\u0006I!\u001f\u0005\b{\u0002\u0011\r\u0011\"\u0003\u007f\u0011\u001d\ty\u0002\u0001Q\u0001\n}D\u0011\"!\t\u0001\u0005\u0004%I!a\t\t\u0011\u0005E\u0002\u0001)A\u0005\u0003KA\u0011\"a\r\u0001\u0005\u0004%I!!\u000e\t\u0011\u0005\r\u0003\u0001)A\u0005\u0003oAq!!\u0012\u0001\t\u0013\t9\u0005C\u0005\u0002d\u0001\t\n\u0011\"\u0003\u0002f!9\u00111\u0010\u0001\u0005\u0002\u0005u\u0004bBAN\u0001\u0011\u0005\u0011Q\u0010\u0005\b\u0003?\u0003A\u0011AA?\u0011\u001d\t\u0019\u000b\u0001C\u0005\u0003KCq!a7\u0001\t\u0003\ti\bC\u0004\u0002`\u0002!\t!! \t\u000f\u0005\r\b\u0001\"\u0001\u0002~!9\u0011q\u001d\u0001\u0005\n\u0005%\bb\u0002B\u0013\u0001\u0011\u0005\u0011Q\u0010\u0005\b\u0005S\u0001A\u0011AA?\u0011\u001d\u0011i\u0003\u0001C\u0001\u0003{BqA!\r\u0001\t\u0003\ti\bC\u0004\u00036\u0001!\t!! \t\u000f\te\u0002\u0001\"\u0001\u0002~!9!Q\b\u0001\u0005\u0002\u0005u\u0004b\u0002B!\u0001\u0011\u0005\u0011Q\u0010\u0005\b\u0005\u000b\u0002A\u0011AA?\u0011\u001d\u0011I\u0005\u0001C\u0001\u0005\u0017BqA!\u001e\u0001\t\u0003\u00119HA\u000fSKBd\u0017nY1BYR,'\u000fT8h\t&\u00148\u000f\u00165sK\u0006$G+Z:u\u0015\ty\u0003'\u0001\u0004tKJ4XM\u001d\u0006\u0002c\u0005)1.\u00194lC\u000e\u00011C\u0001\u00015!\t)\u0004(D\u00017\u0015\u00059\u0014!B:dC2\f\u0017BA\u001d7\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012\u0001\u0010\t\u0003{\u0001i\u0011AL\u0001\u0005iF\u0002\b'F\u0001A!\t\t\u0015*D\u0001C\u0015\t\u0019E)\u0001\u0004d_6lwN\u001c\u0006\u0003c\u0015S!AR$\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005A\u0015aA8sO&\u0011!J\u0011\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0003\u0015!\u0018\u0007\u001d\u0019!\u0003\u0011!\u0018\u0007]\u0019\u0002\u000bQ\f\u0004/\r\u0011\u0002\u000fQ|\u0007/[2JIV\t\u0001\u000b\u0005\u0002B#&\u0011!K\u0011\u0002\u0005+VLG-\u0001\u0005u_BL7-\u00133!\u0003!!x\u000e]5d\u0013\u0012\u001cX#\u0001,\u0011\t]cf\fU\u0007\u00021*\u0011\u0011LW\u0001\nS6lW\u000f^1cY\u0016T!a\u0017\u001c\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002^1\n\u0019Q*\u00199\u0011\u0005}#W\"\u00011\u000b\u0005\u0005\u0014\u0017\u0001\u00027b]\u001eT\u0011aY\u0001\u0005U\u00064\u0018-\u0003\u0002fA\n11\u000b\u001e:j]\u001e\f\u0011\u0002^8qS\u000eLEm\u001d\u0011\u0002\u0015Q|\u0007/[2OC6,7/F\u0001j!\u00119F\f\u00150\u0002\u0017Q|\u0007/[2OC6,7\u000fI\u0001\u0007i&$\u0017\u0007\u001d\u0019\u0016\u00035\u0004\"!\u00118\n\u0005=\u0014%\u0001\u0005+pa&\u001c\u0017\n\u001a)beRLG/[8o\u0003\u001d!\u0018\u000eZ\u0019qa\u0001\n\u0001CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:\u0016\u0003M\u0004\"!\u0010;\n\u0005Ut#\u0001\u0005$bS2,G\rU1si&$\u0018n\u001c8t\u0003E1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7\u000fI\u0001\u0011a\u0006,8/\u001a3QCJ$\u0018\u000e^5p]N,\u0012!\u001f\t\u0003{iL!a\u001f\u0018\u0003!A\u000bWo]3e!\u0006\u0014H/\u001b;j_:\u001c\u0018!\u00059bkN,G\rU1si&$\u0018n\u001c8tA\u0005y\u0001/\u0019:uSRLwN\\*uCR,7/F\u0001��!\u0019\t\t!a\u0002\u0002\f5\u0011\u00111\u0001\u0006\u0004\u0003\u000b\u0011\u0017\u0001B;uS2LA!!\u0003\u0002\u0004\t!A*[:u!\u0011\ti!!\u0007\u000f\t\u0005=\u0011QC\u0007\u0003\u0003#Q1!a\u0005C\u0003\u001diWm]:bO\u0016LA!a\u0006\u0002\u0012\u0005IR\u000b\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$H)\u0019;b\u0013\u0011\tY\"!\b\u00039U\u0003H-\u0019;f\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>t7\u000b^1uK*!\u0011qCA\t\u0003A\u0001\u0018M\u001d;ji&|gn\u0015;bi\u0016\u001c\b%A\u000bva\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;\u0016\u0005\u0005\u0015\u0002\u0003BA\u0014\u0003[i!!!\u000b\u000b\u0007\u0005-\")\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\ty#!\u000b\u0003+U\u0003H-\u0019;f\u001b\u0016$\u0018\rZ1uCJ+\u0017/^3ti\u00061R\u000f\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$\b%A\u0007nKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u000b\u0003\u0003o\u0001B!!\u000f\u0002@5\u0011\u00111\b\u0006\u0004\u0003{q\u0013\u0001C7fi\u0006$\u0017\r^1\n\t\u0005\u0005\u00131\b\u0002\u00105.lU\r^1eCR\f7)Y2iK\u0006qQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016\u0004\u0013!E5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uKR1\u0011\u0011JA(\u00033\u00022!PA&\u0013\r\tiE\f\u0002\u0012\u0013:LG/[1m\r\u0016$8\r[*uCR,\u0007bBA)1\u0001\u0007\u00111K\u0001\fM\u0016$8\r[(gMN,G\u000fE\u00026\u0003+J1!a\u00167\u0005\u0011auN\\4\t\u0013\u0005m\u0003\u0004%AA\u0002\u0005u\u0013a\u00037fC\u0012,'/\u00129pG\"\u00042!NA0\u0013\r\t\tG\u000e\u0002\u0004\u0013:$\u0018aG5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\u0012\"WMZ1vYR$#'\u0006\u0002\u0002h)\"\u0011QLA5W\t\tY\u0007\u0005\u0003\u0002n\u0005]TBAA8\u0015\u0011\t\t(a\u001d\u0002\u0013Ut7\r[3dW\u0016$'bAA;m\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005e\u0014q\u000e\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017\u0001L:i_VdGMT8u\u0003\u0012$\u0007+\u0019:uSRLwN\\%g\rV$XO]3M_\u001eL5OT8u\t\u00164\u0017N\\3e)\t\ty\bE\u00026\u0003\u0003K1!a!7\u0005\u0011)f.\u001b;)\u0007i\t9\t\u0005\u0003\u0002\n\u0006]UBAAF\u0015\u0011\ti)a$\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0002\u0012\u0006M\u0015a\u00026va&$XM\u001d\u0006\u0004\u0003+;\u0015!\u00026v]&$\u0018\u0002BAM\u0003\u0017\u0013A\u0001V3ti\u0006a3\u000f[8vY\u0012,\u0006\u000fZ1uK2+\u0017\rZ3s\u000bB|7\r[!gi\u0016\u0014h)\u001a8dK\u0012,\u0005o\\2i\u000bJ\u0014xN\u001d\u0015\u00047\u0005\u001d\u0015AJ:i_VdGMU3qY\u0006\u001cWmQ;se\u0016tG\u000fT8h\t&\u0014x\u000b[3o\u0007\u0006,x\r\u001b;Va\"\u001aA$a\"\u0002/U\u0004H-\u0019;f%\u0016\f7o]5h]6,g\u000e^*uCR,G\u0003CAT\u0003[\u000b9,a/\u0011\u000bU\nI+a \n\u0007\u0005-fG\u0001\u0004PaRLwN\u001c\u0005\b\u0003_k\u0002\u0019AAY\u0003\u0019!\bN]3bIB\u0019Q(a-\n\u0007\u0005UfFA\rSKBd\u0017nY1BYR,'\u000fT8h\t&\u00148\u000f\u00165sK\u0006$\u0007bBA];\u0001\u0007\u0011QL\u0001\fa\u0006\u0014H/\u001b;j_:LE\rC\u0004\u0002>v\u0001\r!a0\u0002\u00119,wo\u0015;bi\u0016\u0004B!!1\u0002V:!\u00111YAi\u001d\u0011\t)-a4\u000f\t\u0005\u001d\u0017QZ\u0007\u0003\u0003\u0013T1!a33\u0003\u0019a$o\\8u}%\t\u0011'\u0003\u00020a%\u0019\u00111\u001b\u0018\u00023I+\u0007\u000f\\5dC\u0006cG/\u001a:M_\u001e$\u0015N]:UQJ,\u0017\rZ\u0005\u0005\u0003/\fINA\tSK\u0006\u001c8/[4o[\u0016tGo\u0015;bi\u0016T1!a5/\u0003A\u001b\bn\\;mIJ+\u0007\u000f\\1dK\u000e+(O]3oi2{w\rR5s/\",gnQ1vO\"$X\u000b],ji\"\fe\r^3s\u0003N\u001c\u0018n\u001a8nK:$(+Z9vKN$\b*Y:CK\u0016t7i\\7qY\u0016$X\r\u001a\u0015\u0004=\u0005\u001d\u0015\u0001Q:i_VdGMU3wKJ$\u0018I\\=TG\",G-\u001e7fI\u0006\u001b8/[4o[\u0016tGOU3rk\u0016\u001cH/\u00134BgNLwM\\7f]RL5oQ1oG\u0016dG.\u001a3)\u0007}\t9)A\u001ftQ>,H\u000e\u001a*fm\u0016\u0014HOU3bgNLwM\\7f]R\u001chi\u001c:J]\u000e|W\u000e\u001d7fi\u00164U\u000f^;sKJ+\u0007\u000f\\5dCB\u0013x.\\8uS>t7\u000fK\u0002!\u0003\u000f\u000bq#\\8dW\u001a+Go\u00195Ge>l7)\u001e:sK:$Hj\\4\u0015\u0019\u0005}\u00141^Ax\u0003s\u0014\u0019A!\u0004\t\r\u00055\u0018\u00051\u0001n\u0003A!x\u000e]5d\u0013\u0012\u0004\u0016M\u001d;ji&|g\u000eC\u0004\u0002r\u0006\u0002\r!a=\u0002\u0017I,\u0017/^3ti\u0012\u000bG/\u0019\t\u0004{\u0005U\u0018bAA|]\t1\u0002+\u0019:uSRLwN\u001c$fi\u000eDW*\u001a;bI\u0006$\u0018\rC\u0004\u0002|\u0006\u0002\r!!@\u0002\r\r|gNZ5h!\ri\u0014q`\u0005\u0004\u0005\u0003q#aC&bM.\f7i\u001c8gS\u001eDqA!\u0002\"\u0001\u0004\u00119!\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0011\u0007u\u0012I!C\u0002\u0003\f9\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'\u000fC\u0004\u0003\u0010\u0005\u0002\rA!\u0005\u0002\u0019I,7\u000f]8og\u0016$\u0015\r^1\u0011\t\tM!\u0011E\u0007\u0003\u0005+QAAa\u0006\u0003\u001a\u0005\u0019An\\4\u000b\t\tm!QD\u0001\nS:$XM\u001d8bYNT1Aa\bE\u0003\u001d\u0019Ho\u001c:bO\u0016LAAa\t\u0003\u0016\t\u0011b)\u001a;dQB\u000b'\u000f^5uS>tG)\u0019;b\u0003\tJ7o];fg\u0016\u0003xn\u00195SKF,Xm\u001d;Ge>lGj\\2bYJ+\u0007\u000f\\5dC\"\u001a!%a\"\u0002{\u0019,Go\u00195Fa>\u001c\u0007n\u001d$s_6dU-\u00193feNCw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\\$fi2{7-\u00197SKBd\u0017nY1)\u0007\r\n9)A\u000ftQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_J+\u0007\u000f\\5dC>3gm]3uQ\r!\u0013qQ\u0001.g\"|W\u000f\u001c3UeVt7-\u0019;f)>,e\u000eZ(gMN,Go\u00144MCJ<Wm\u001d;D_6lwN\\#q_\u000eD\u0007fA\u0013\u0002\b\u0006\t5\u000f[8vY\u0012$&/\u001e8dCR,Gk\\%oSRL\u0017\r\u001c$fi\u000eDwJ\u001a4tKRLeMU3qY&\u001c\u0017MU3ukJt7/\u00168eK\u001aLg.\u001a3PM\u001a\u001cX\r\u001e\u0015\u0004M\u0005\u001d\u0015aK:i_VdG\rU8mY&sG-\u001a4j]&$X\r\\=JMJ+\u0007\u000f\\5dC:{G/\u0011<bS2\f'\r\\3)\u0007\u001d\n9)\u0001\u0014tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[(oYfD3\u0001KAD\u0003q\u0019\bn\\;mI\u001a+Go\u00195P]\u0016\u0014V\r\u001d7jG\u0006\fE/\u0011+j[\u0016D3!KAD\u00035\u001a\bn\\;mI\u001a+Go\u00195O_:$U\r\\1zK\u0012\fe\u000e\u001a(p]R\u0013XO\\2bi&twMU3qY&\u001c\u0017m\u001d\u0015\u0004U\u0005\u001d\u0015\u0001B:uk\n$B\"a \u0003N\tm#q\fB2\u0005gBqAa\u0014,\u0001\u0004\u0011\t&A\u0004m_\u001e$\u0016\u0007\u001d\u0019\u0011\t\tM#qK\u0007\u0003\u0005+R1Aa\u00061\u0013\u0011\u0011IF!\u0016\u0003\u0017\u0005\u00137\u000f\u001e:bGRdun\u001a\u0005\b\u0005;Z\u0003\u0019\u0001B)\u0003\u001dawn\u001a+2aFBqA!\u0019,\u0001\u0004\u0011\t&A\u0005gkR,(/\u001a'pO\"9!QM\u0016A\u0002\t\u001d\u0014!\u00039beRLG/[8o!\u0011\u0011IGa\u001c\u000e\u0005\t-$b\u0001B7a\u000591\r\\;ti\u0016\u0014\u0018\u0002\u0002B9\u0005W\u0012\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\t\u00151\u00061\u0001\u0003\b\u0005)2\u000f^;c/&$\bNR3uG\"lUm]:bO\u0016\u001cHCDA@\u0005s\u0012YH! \u0003��\t\u0005%1\u0011\u0005\b\u0005\u001fb\u0003\u0019\u0001B)\u0011\u001d\u0011i\u0006\fa\u0001\u0005#BqA!\u0019-\u0001\u0004\u0011\t\u0006C\u0004\u0003f1\u0002\rAa\u001a\t\u000f\t\u0015A\u00061\u0001\u0003\b!9!Q\u0011\u0017A\u0002\t\u001d\u0015\u0001\u0005:fgB|gn]3DC2d'-Y2l!\u0019\u0011IIa$\u0003\u00146\u0011!1\u0012\u0006\u0004\u0005\u001b;\u0015aB7pG.LGo\\\u0005\u0005\u0005#\u0013YI\u0001\bBe\u001e,X.\u001a8u\u0007\u0006\u0004Ho\u001c:\u0011\u000fU\u0012)J!'\u0002��%\u0019!q\u0013\u001c\u0003\u0013\u0019+hn\u0019;j_:\f\u0004C\u0002BN\u0005;\u0013\t+D\u0001[\u0013\r\u0011yJ\u0017\u0002\u0004'\u0016\f\bCB\u001b\u0003$6\u0014\t\"C\u0002\u0003&Z\u0012a\u0001V;qY\u0016\u0014\u0004")
/* loaded from: input_file:kafka/server/ReplicaAlterLogDirsThreadTest.class */
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final Uuid topicId = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId())}));
    private final Map<Uuid, String> topicNames = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicId()), "topic1")}));
    private final TopicIdPartition tid1p0 = new TopicIdPartition(topicId(), t1p0());
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final PausedPartitions pausedPartitions = new PausedPartitions();
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$)).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, Collections.emptyList(), partitionStates(), Collections.emptyList(), CollectionConverters$.MODULE$.MapHasAsJava(topicIds()).asJava(), Collections.emptyList(), false).build();
    private final ZkMetadataCache metadataCache;

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private Uuid topicId() {
        return this.topicId;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private Map<Uuid, String> topicNames() {
        return this.topicNames;
    }

    private TopicIdPartition tid1p0() {
        return this.tid1p0;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private PausedPartitions pausedPartitions() {
        return this.pausedPartitions;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(long j, int i) {
        return new InitialFetchState(new Some(topicId()), new BrokerEndPoint(0, "localhost", 9092), i, j, None$.MODULE$);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @Test
    public void shouldNotAddPartitionIfFutureLogIsNotDefined() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new BrokerTopicStats(), DirectoryEventHandler.NOOP);
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1))}))));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
    }

    @Test
    public void shouldUpdateLeaderEpochAfterFencedEpochError() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("gOZOXHnkR9eiA1W9ZuLk8A")));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FullPartitionFetchMetadata(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5 - 1)), Optional.empty(), -1L), fromProps, replicaManager, new FetchPartitionData(Errors.FENCED_LEADER_EPOCH, -1L, -1L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false, new FetchPartitionStats(0L, 0L)));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new BrokerTopicStats(), DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5 - 1))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(5)), replicaAlterLogDirsThread.fetchState(t1p0()).map(partitionFetchState -> {
            return BoxesRunTime.boxToInteger(partitionFetchState.currentLeaderEpoch());
        }));
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        mockFetchFromCurrentLog(tid1p0(), new FullPartitionFetchMetadata(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty(), -1L), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false, new FetchPartitionStats(0L, 0L)));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertFalse(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUp() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("PGLOjDjKQaCOXFOtxymIig")));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FullPartitionFetchMetadata(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty(), -1L), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false, new FetchPartitionStats(0L, 0L)));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new BrokerTopicStats(), DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    private Option<BoxedUnit> updateReassignmentState(ReplicaAlterLogDirsThread replicaAlterLogDirsThread, int i, ReplicaAlterLogDirsThread.ReassignmentState reassignmentState) {
        return topicNames().get(topicId()).map(str -> {
            $anonfun$updateReassignmentState$1(replicaAlterLogDirsThread, i, reassignmentState, str);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUpWithAfterAssignmentRequestHasBeenCompleted() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler) Mockito.mock(DirectoryEventHandler.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Uuid fromString = Uuid.fromString("EzI9SqkFQKW1iFc1ZwP9SQ");
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(partition.topicId()).thenReturn(new Some(topicId()));
        Mockito.when(partition.futureReplicaDirectoryId()).thenReturn(new Some(Uuid.randomUuid()));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.runCallbackIfFutureReplicaCaughtUp((Function1) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(fromString));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FullPartitionFetchMetadata(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty(), -1L), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false, new FetchPartitionStats(0L, 0L)));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new BrokerTopicStats(), directoryEventHandler);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        updateReassignmentState(replicaAlterLogDirsThread, 0, ReplicaAlterLogDirsThread$ReassignmentState$Queued$.MODULE$);
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        updateReassignmentState(replicaAlterLogDirsThread, 0, ReplicaAlterLogDirsThread$ReassignmentState$Accepted$.MODULE$);
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        Mockito.verifyNoInteractions(new Object[]{directoryEventHandler});
    }

    @Test
    public void shouldRevertAnyScheduledAssignmentRequestIfAssignmentIsCancelled() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler) Mockito.mock(DirectoryEventHandler.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(partition.topicId()).thenReturn(new Some(topicId()));
        Mockito.when(partition.futureReplicaDirectoryId()).thenReturn(new Some(Uuid.randomUuid()));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.randomUuid()));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.runCallbackIfFutureReplicaCaughtUp((Function1) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FullPartitionFetchMetadata(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty(), -1L), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false, new FetchPartitionStats(0L, 0L)));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new BrokerTopicStats(), directoryEventHandler);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        updateReassignmentState(replicaAlterLogDirsThread, 0, ReplicaAlterLogDirsThread$ReassignmentState$Queued$.MODULE$);
        replicaAlterLogDirsThread.removePartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0()})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isEmpty());
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(org.apache.kafka.server.common.TopicIdPartition.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Uuid.class);
        ((DirectoryEventHandler) Mockito.verify(directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition) forClass.capture(), (Uuid) forClass2.capture(), (String) ArgumentMatchers.eq("Reverting reassignment for canceled future replica"), (Runnable) ArgumentMatchers.any());
        Assertions.assertEquals(new org.apache.kafka.server.common.TopicIdPartition(topicId(), t1p0().partition()), forClass.getValue());
        Assertions.assertEquals(partition.logDirectoryId().get(), forClass2.getValue());
    }

    @Test
    public void shouldRevertReassignmentsForIncompleteFutureReplicaPromotions() {
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler) Mockito.mock(DirectoryEventHandler.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) Mockito.mock(BrokerTopicStats.class), directoryEventHandler);
        Seq seq = (Seq) ((IterableOps) Seq$.MODULE$.range(BoxesRunTime.boxToInteger(0), BoxesRunTime.boxToInteger(4), Numeric$IntIsIntegral$.MODULE$)).map(obj -> {
            return $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$1(BoxesRunTime.unboxToInt(obj));
        });
        Seq seq2 = (Seq) ((IterableOps) Seq$.MODULE$.range(BoxesRunTime.boxToInteger(0), BoxesRunTime.boxToInteger(4), Numeric$IntIsIntegral$.MODULE$)).map(obj2 -> {
            return $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$2(this, BoxesRunTime.unboxToInt(obj2));
        });
        Seq seq3 = (Seq) ((IterableOps) Seq$.MODULE$.range(BoxesRunTime.boxToInteger(0), BoxesRunTime.boxToInteger(4), Numeric$IntIsIntegral$.MODULE$)).map(obj3 -> {
            return $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$3(BoxesRunTime.unboxToInt(obj3));
        });
        seq.foreach(topicPartition -> {
            return (ReplicaAlterLogDirsThread.PromotionState) replicaAlterLogDirsThread.promotionStates().put(topicPartition, new ReplicaAlterLogDirsThread.PromotionState(ReplicaAlterLogDirsThread$ReassignmentState$None$.MODULE$, new Some(this.topicId()), new Some(seq3.apply(topicPartition.partition()))));
        });
        replicaAlterLogDirsThread.updateReassignmentState((TopicPartition) seq.apply(0), ReplicaAlterLogDirsThread$ReassignmentState$None$.MODULE$);
        replicaAlterLogDirsThread.updateReassignmentState((TopicPartition) seq.apply(1), ReplicaAlterLogDirsThread$ReassignmentState$Queued$.MODULE$);
        replicaAlterLogDirsThread.updateReassignmentState((TopicPartition) seq.apply(2), ReplicaAlterLogDirsThread$ReassignmentState$Accepted$.MODULE$);
        replicaAlterLogDirsThread.updateReassignmentState((TopicPartition) seq.apply(3), ReplicaAlterLogDirsThread$ReassignmentState$Effective$.MODULE$);
        replicaAlterLogDirsThread.removePartitions(seq.toSet());
        ((DirectoryEventHandler) Mockito.verify(directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition) ArgumentMatchers.eq(seq2.apply(1)), (Uuid) ArgumentMatchers.eq(seq3.apply(1)), (String) ArgumentMatchers.eq("Reverting reassignment for canceled future replica"), (Runnable) ArgumentMatchers.any());
        ((DirectoryEventHandler) Mockito.verify(directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition) ArgumentMatchers.eq(seq2.apply(2)), (Uuid) ArgumentMatchers.eq(seq3.apply(2)), (String) ArgumentMatchers.eq("Reverting reassignment for canceled future replica"), (Runnable) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{directoryEventHandler});
    }

    private void mockFetchFromCurrentLog(final TopicIdPartition topicIdPartition, final PartitionFetchMetadata partitionFetchMetadata, KafkaConfig kafkaConfig, ReplicaManager replicaManager, FetchPartitionData fetchPartitionData) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Function1.class);
        final ReplicaAlterLogDirsThreadTest replicaAlterLogDirsThreadTest = null;
        replicaManager.fetchMessages((FetchParams) ArgumentMatchers.eq(new FetchParams(ApiKeys.FETCH.latestVersion(), -3, -1L, 0L, 0, Predef$.MODULE$.Integer2int(kafkaConfig.replicaFetchResponseMaxBytes()), FetchIsolation.LOG_END, Optional.empty())), (Seq) ArgumentMatchers.argThat(new ArgumentMatcher<Seq<Tuple2<TopicIdPartition, PartitionFetchMetadata>>>(replicaAlterLogDirsThreadTest, topicIdPartition, partitionFetchMetadata) { // from class: kafka.server.ReplicaAlterLogDirsThreadTest$$anon$1
            private final TopicIdPartition topicIdPartition$1;
            private final PartitionFetchMetadata requestData$1;

            public Class<?> type() {
                return super.type();
            }

            public boolean matches(Seq<Tuple2<TopicIdPartition, PartitionFetchMetadata>> seq) {
                if (seq == null) {
                    return false;
                }
                Option unapply = package$.MODULE$.$plus$colon().unapply(seq);
                if (unapply.isEmpty()) {
                    return false;
                }
                Tuple2 tuple2 = (Tuple2) ((Tuple2) unapply.get())._1();
                Seq seq2 = (Seq) ((Tuple2) unapply.get())._2();
                if (tuple2 == null) {
                    return false;
                }
                TopicIdPartition topicIdPartition2 = (TopicIdPartition) tuple2._1();
                PartitionFetchMetadata partitionFetchMetadata2 = (PartitionFetchMetadata) tuple2._2();
                if (seq2 == null) {
                    return false;
                }
                SeqOps unapplySeq = Seq$.MODULE$.unapplySeq(seq2);
                if (SeqFactory$UnapplySeqWrapper$.MODULE$.isEmpty$extension(unapplySeq)) {
                    return false;
                }
                new SeqFactory.UnapplySeqWrapper(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq));
                if (SeqFactory$UnapplySeqWrapper$.MODULE$.lengthCompare$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq), 0) != 0) {
                    return false;
                }
                TopicIdPartition topicIdPartition3 = this.topicIdPartition$1;
                if (topicIdPartition2 == null) {
                    if (topicIdPartition3 != null) {
                        return false;
                    }
                } else if (!topicIdPartition2.equals(topicIdPartition3)) {
                    return false;
                }
                Uuid uuid = partitionFetchMetadata2.topicId();
                Uuid uuid2 = this.requestData$1.topicId();
                if (uuid == null) {
                    if (uuid2 != null) {
                        return false;
                    }
                } else if (!uuid.equals(uuid2)) {
                    return false;
                }
                if (partitionFetchMetadata2.fetchOffset() != this.requestData$1.fetchOffset()) {
                    return false;
                }
                Optional currentLeaderEpoch = partitionFetchMetadata2.currentLeaderEpoch();
                Optional currentLeaderEpoch2 = this.requestData$1.currentLeaderEpoch();
                if (currentLeaderEpoch == null) {
                    if (currentLeaderEpoch2 != null) {
                        return false;
                    }
                } else if (!currentLeaderEpoch.equals(currentLeaderEpoch2)) {
                    return false;
                }
                Optional lastFetchedEpoch = partitionFetchMetadata2.lastFetchedEpoch();
                Optional lastFetchedEpoch2 = this.requestData$1.lastFetchedEpoch();
                if (lastFetchedEpoch == null) {
                    if (lastFetchedEpoch2 != null) {
                        return false;
                    }
                } else if (!lastFetchedEpoch.equals(lastFetchedEpoch2)) {
                    return false;
                }
                return partitionFetchMetadata2.startOffset() == this.requestData$1.startOffset() && partitionFetchMetadata2.maxBytes() == this.requestData$1.maxBytes();
            }

            {
                this.topicIdPartition$1 = topicIdPartition;
                this.requestData$1 = partitionFetchMetadata;
            }
        }), (ReplicaQuota) ArgumentMatchers.eq(QuotaFactory$UnboundedQuota$.MODULE$), (Function1) forClass.capture(), ArgumentMatchers.eq(false));
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$mockFetchFromCurrentLog$1(forClass, topicIdPartition, fetchPartitionData, invocationOnMock);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(1));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13));
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition2);
        Mockito.when(partition2.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232));
        Assertions.assertEquals((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, (ReplicaQuota) null), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null, DirectoryEventHandler.NOOP).leader().fetchEpochEndOffsets((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(5))}))), "results from leader epoch request should have offset from local replica");
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13));
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenThrow(new Throwable[]{new KafkaStorageException()});
        Assertions.assertEquals((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, (ReplicaQuota) null), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null, DirectoryEventHandler.NOOP).leader().fetchEpochEndOffsets((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(2))}))));
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog3 = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog4 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> forClass3 = ArgumentCaptor.forClass(Function1.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(BoxesRunTime.boxToInteger(partition2.partitionId())).thenReturn(BoxesRunTime.boxToInteger(1));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition2);
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog3);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.futureLocalLogOrException(t1p1())).thenReturn(abstractLog4);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog3.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(191)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog4.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(191)));
        Mockito.when(abstractLog3.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(2)));
        Mockito.when(abstractLog3.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(191, 2)));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(190));
        Mockito.when(abstractLog4.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(2)));
        Mockito.when(abstractLog4.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(191, 2)));
        Mockito.when(partition2.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(192));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("Jsg8ufNCQYONNquPt7VYpA")));
        Mockito.when(partition2.logDirectoryId()).thenReturn(new Some(Uuid.fromString("D2Yf6FtNROGVKoIZadSFIg")));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        stubWithFetchMessages(abstractLog, abstractLog2, abstractLog3, partition, replicaManager, forClass3);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        replicaAlterLogDirsThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        ((Partition) Mockito.verify(partition2)).truncateTo(BoxesRunTime.unboxToLong(forClass2.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(190, BoxesRunTime.unboxToLong(forClass.getValue()));
        Assertions.assertEquals(191, BoxesRunTime.unboxToLong(forClass2.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> forClass2 = ArgumentCaptor.forClass(Function1.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog2);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(195)));
        Mockito.when(abstractLog2.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(new Some(BoxesRunTime.boxToInteger(5 - 2)));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 1).setEndOffset(200));
        Mockito.when(abstractLog2.endOffsetForEpoch(5 - 1)).thenReturn(new Some(new OffsetAndEpoch(195, 5 - 2)));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5 - 2, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 2).setEndOffset(190));
        Mockito.when(abstractLog2.endOffsetForEpoch(5 - 2)).thenReturn(new Some(new OffsetAndEpoch(191, 5 - 2)));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("n6WOe2zPScqZLIreCWN6Ug")));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, forClass2);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1))})));
        replicaAlterLogDirsThread.doWork();
        replicaAlterLogDirsThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.eq(true));
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(190)), new StringBuilder(48).append("Expected offset ").append(190).append(" in captured truncation offsets ").append(forClass.getAllValues()).toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> forClass2 = ArgumentCaptor.forClass(Function1.class);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog2);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("b2e1ihvGQiu6A504oKoddQ")));
        Mockito.when(abstractLog2.latestEpoch()).thenReturn(None$.MODULE$);
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, forClass2);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(100, 1))})));
        replicaAlterLogDirsThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.eq(true));
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(forClass.getValue()), "Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET");
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Function1.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("wO7bUpvcSZC0QKEK6P6AiA")));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog2);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(290)));
        Mockito.when(abstractLog2.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(1)));
        Mockito.when(abstractLog2.endOffsetForEpoch(1)).thenReturn(new Some(new OffsetAndEpoch(290, 1)));
        Mockito.when(replicaManager.localLog(t1p0())).thenReturn(new Some(abstractLog));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 1, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(1).setEndOffset(300));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        replicaManager.fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(), (Function1) forClass2.capture(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()));
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(forClass2, invocationOnMock);
            return BoxedUnit.UNIT;
        });
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        ((Partition) Mockito.verify(partition, Mockito.never())).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.eq(true));
        Assertions.assertEquals(0, forClass.getAllValues().size());
        replicaAlterLogDirsThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.eq(true));
        Assertions.assertEquals(290, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> forClass = ArgumentCaptor.forClass(Function1.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("dybMM9CpRP2s6HSslW4NHg")));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(213));
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog2);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(abstractLog2.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(190)));
        Mockito.when(abstractLog2.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(190, 5)));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, forClass);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        ((Partition) Mockito.verify(partition)).lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false);
        ((Partition) Mockito.verify(partition)).truncateTo(Int$.MODULE$.int2long(190), true);
    }

    @Test
    public void shouldFetchOneReplicaAtATime() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("Y0qUL19gSmKAXmohmrUM4g")));
        stub(abstractLog, null, abstractLog2, partition, replicaManager);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        LeaderEndPoint leader = replicaAlterLogDirsThread.leader();
        scala.collection.Map$ map$ = scala.collection.Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        Some some = new Some(topicId());
        None$ none$5 = None$.MODULE$;
        None$ none$6 = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$7 = None$.MODULE$;
        None$ none$8 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        Some some2 = new Some(topicId());
        None$ none$9 = None$.MODULE$;
        None$ none$10 = None$.MODULE$;
        Fetching$ fetching$2 = Fetching$.MODULE$;
        None$ none$11 = None$.MODULE$;
        None$ none$12 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        AbstractFetcherThread.ResultWithPartitions buildFetch = leader.buildFetch((scala.collection.Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 150L, none$5, 1, none$6, fetching$, none$7, none$8, 0)), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, new PartitionFetchState(some2, 160L, none$9, 1, none$10, fetching$2, none$11, none$12, 0))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        scala.collection.Map partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertFalse(fetchRequest.fetchData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        FetchRequest build = fetchRequest.build();
        Assertions.assertEquals(0, build.minBytes());
        scala.collection.immutable.Seq seq = CollectionConverters$.MODULE$.MapHasAsScala(build.fetchData(CollectionConverters$.MODULE$.MapHasAsJava(topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals(1, seq.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq.head())._1()).topicPartition(), "Expected fetch request for first partition");
        Assertions.assertEquals(150L, ((FetchRequest.PartitionData) ((Tuple2) seq.head())._2()).fetchOffset);
    }

    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog2.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(123)));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(partition.logDirectoryId()).thenReturn(new Some(Uuid.fromString("rtrdy3nsQwO1OQUEUYGxRQ")));
        stub(abstractLog, null, abstractLog2, partition, replicaManager);
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new LocalLeaderEndPoint(new BrokerEndPoint(0, "localhost", 1000), fromProps, replicaManager, replicationQuotaManager), failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, (BrokerTopicStats) null, DirectoryEventHandler.NOOP);
        replicaAlterLogDirsThread.addPartitions((scala.collection.Map) scala.collection.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        LeaderEndPoint leader = replicaAlterLogDirsThread.leader();
        scala.collection.Map$ map$ = scala.collection.Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Some some = new Some(topicId());
        None$ none$5 = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$6 = None$.MODULE$;
        None$ none$7 = None$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        AbstractFetcherThread.ResultWithPartitions buildFetch = leader.buildFetch((scala.collection.Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 150L, none$5, 1, None$.MODULE$, fetching$, none$6, none$7, 0)), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, new PartitionFetchState(new Some(topicId()), 160L, None$.MODULE$, 1, None$.MODULE$, Truncating$.MODULE$, None$.MODULE$, None$.MODULE$, 0))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        scala.collection.Map partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch = (AbstractFetcherThread.ReplicaFetch) option.get();
        Assertions.assertFalse(replicaFetch.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        scala.collection.immutable.Seq seq = CollectionConverters$.MODULE$.MapHasAsScala(replicaFetch.fetchRequest().build().fetchData(CollectionConverters$.MODULE$.MapHasAsJava(topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals(1, seq.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq.head())._1()).topicPartition(), "Expected fetch request for non-truncating partition");
        Assertions.assertEquals(150L, ((FetchRequest.PartitionData) ((Tuple2) seq.head())._2()).fetchOffset);
        LeaderEndPoint leader2 = replicaAlterLogDirsThread.leader();
        scala.collection.Map$ map$2 = scala.collection.Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$2 = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        PartitionFetchState$ partitionFetchState$3 = PartitionFetchState$.MODULE$;
        Some some2 = new Some(topicId());
        None$ none$8 = None$.MODULE$;
        Fetching$ fetching$2 = Fetching$.MODULE$;
        None$ none$9 = None$.MODULE$;
        None$ none$10 = None$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        Some some3 = new Some(topicId());
        None$ none$11 = None$.MODULE$;
        Some some4 = new Some(new DelayedItem(5000L));
        Fetching$ fetching$3 = Fetching$.MODULE$;
        None$ none$12 = None$.MODULE$;
        None$ none$13 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$4 = PartitionFetchState$.MODULE$;
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = leader2.buildFetch((scala.collection.Map) map$2.apply(scalaRunTime$2.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$3.$minus$greater$extension(ArrowAssoc3, new PartitionFetchState(some2, 140L, none$8, 1, None$.MODULE$, fetching$2, none$9, none$10, 0)), predef$ArrowAssoc$4.$minus$greater$extension(ArrowAssoc4, new PartitionFetchState(some3, 160L, none$11, 1, some4, fetching$3, none$12, none$13, 0))})));
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        scala.collection.Map partitionsWithError2 = buildFetch2.partitionsWithError();
        Assertions.assertTrue(option2.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch2 = (AbstractFetcherThread.ReplicaFetch) option2.get();
        Assertions.assertFalse(replicaFetch2.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError2.nonEmpty());
        scala.collection.immutable.Seq seq2 = CollectionConverters$.MODULE$.MapHasAsScala(replicaFetch2.fetchRequest().build().fetchData(CollectionConverters$.MODULE$.MapHasAsJava(topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals(1, seq2.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq2.head())._1()).topicPartition(), "Expected fetch request for non-delayed partition");
        Assertions.assertEquals(140L, ((FetchRequest.PartitionData) ((Tuple2) seq2.head())._2()).fetchOffset);
        LeaderEndPoint leader3 = replicaAlterLogDirsThread.leader();
        scala.collection.Map$ map$3 = scala.collection.Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$3 = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$5 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc5 = Predef$.MODULE$.ArrowAssoc(t1p0());
        Some some5 = new Some(topicId());
        None$ none$14 = None$.MODULE$;
        Some some6 = new Some(new DelayedItem(5000L));
        Fetching$ fetching$4 = Fetching$.MODULE$;
        None$ none$15 = None$.MODULE$;
        None$ none$16 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$5 = PartitionFetchState$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$6 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc6 = Predef$.MODULE$.ArrowAssoc(t1p1());
        Some some7 = new Some(topicId());
        None$ none$17 = None$.MODULE$;
        Some some8 = new Some(new DelayedItem(5000L));
        Fetching$ fetching$5 = Fetching$.MODULE$;
        None$ none$18 = None$.MODULE$;
        None$ none$19 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$6 = PartitionFetchState$.MODULE$;
        AbstractFetcherThread.ResultWithPartitions buildFetch3 = leader3.buildFetch((scala.collection.Map) map$3.apply(scalaRunTime$3.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$5.$minus$greater$extension(ArrowAssoc5, new PartitionFetchState(some5, 140L, none$14, 1, some6, fetching$4, none$15, none$16, 0)), predef$ArrowAssoc$6.$minus$greater$extension(ArrowAssoc6, new PartitionFetchState(some7, 160L, none$17, 1, some8, fetching$5, none$18, none$19, 0))})));
        if (buildFetch3 == null) {
            throw new MatchError((Object) null);
        }
        Option option3 = (Option) buildFetch3.result();
        scala.collection.Map partitionsWithError3 = buildFetch3.partitionsWithError();
        Assertions.assertTrue(option3.isEmpty(), "Expected no fetch requests since all partitions are delayed");
        Assertions.assertFalse(partitionsWithError3.nonEmpty());
    }

    public void stub(AbstractLog abstractLog, AbstractLog abstractLog2, AbstractLog abstractLog3, Partition partition, ReplicaManager replicaManager) {
        Mockito.when(replicaManager.localLog(t1p0())).thenReturn(new Some(abstractLog));
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog3);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.localLog(t1p1())).thenReturn(new Some(abstractLog2));
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(abstractLog2);
        Mockito.when(replicaManager.futureLocalLogOrException(t1p1())).thenReturn(abstractLog3);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p1())).thenReturn(new Some(partition));
    }

    public void stubWithFetchMessages(AbstractLog abstractLog, AbstractLog abstractLog2, AbstractLog abstractLog3, Partition partition, ReplicaManager replicaManager, ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> argumentCaptor) {
        stub(abstractLog, abstractLog2, abstractLog3, partition, replicaManager);
        replicaManager.fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(), (Function1) argumentCaptor.capture(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()));
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$stubWithFetchMessages$1(argumentCaptor, invocationOnMock);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$updateReassignmentState$1(ReplicaAlterLogDirsThread replicaAlterLogDirsThread, int i, ReplicaAlterLogDirsThread.ReassignmentState reassignmentState, String str) {
        replicaAlterLogDirsThread.updateReassignmentState(new TopicPartition(str, i), reassignmentState);
    }

    public static final /* synthetic */ TopicPartition $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$1(int i) {
        return new TopicPartition("t", i);
    }

    public static final /* synthetic */ org.apache.kafka.server.common.TopicIdPartition $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$2(ReplicaAlterLogDirsThreadTest replicaAlterLogDirsThreadTest, int i) {
        return new org.apache.kafka.server.common.TopicIdPartition(replicaAlterLogDirsThreadTest.topicId(), i);
    }

    public static final /* synthetic */ Uuid $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$3(int i) {
        return Uuid.fromString(new StringBuilder(21).append("TESTBROKER0000DIR").append(i).append("AAAA").toString());
    }

    public static final /* synthetic */ void $anonfun$mockFetchFromCurrentLog$1(ArgumentCaptor argumentCaptor, TopicIdPartition topicIdPartition, FetchPartitionData fetchPartitionData, InvocationOnMock invocationOnMock) {
        ((Function1) argumentCaptor.getValue()).apply(new $colon.colon(new Tuple2(topicIdPartition, fetchPartitionData), Nil$.MODULE$));
    }

    public static final /* synthetic */ void $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(ArgumentCaptor argumentCaptor, InvocationOnMock invocationOnMock) {
        ((Function1) argumentCaptor.getValue()).apply(Seq$.MODULE$.empty());
    }

    public static final /* synthetic */ void $anonfun$stubWithFetchMessages$1(ArgumentCaptor argumentCaptor, InvocationOnMock invocationOnMock) {
        ((Function1) argumentCaptor.getValue()).apply(Seq$.MODULE$.empty());
    }

    public ReplicaAlterLogDirsThreadTest() {
        MetadataVersion latestTesting = MetadataVersion.latestTesting();
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        ZkMetadataCache$ zkMetadataCache$ = ZkMetadataCache$.MODULE$;
        Seq empty = Seq$.MODULE$.empty();
        ZkMetadataCache$ zkMetadataCache$2 = ZkMetadataCache$.MODULE$;
        ZkMetadataCache$ zkMetadataCache$3 = ZkMetadataCache$.MODULE$;
        this.metadataCache = new ZkMetadataCache(0, latestTesting, createEmpty, empty, false, false);
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
