package kafka.server;

import java.util.Optional;
import kafka.api.Request$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ReplicaAlterLogDirsThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015f\u0001B\r\u001b\u0001}AQA\n\u0001\u0005\u0002\u001dBqA\u000b\u0001C\u0002\u0013%1\u0006\u0003\u00048\u0001\u0001\u0006I\u0001\f\u0005\bq\u0001\u0011\r\u0011\"\u0003,\u0011\u0019I\u0004\u0001)A\u0005Y!9!\b\u0001b\u0001\n\u0013Y\u0004BB \u0001A\u0003%A\bC\u0003A\u0001\u0011%\u0011\tC\u0004P\u0001E\u0005I\u0011\u0002)\t\u000bm\u0003A\u0011\u0001/\t\u000b-\u0004A\u0011\u0001/\t\u000b5\u0004A\u0011\u0001/\t\u000b=\u0004A\u0011\u00029\t\r\u0005=\u0001\u0001\"\u0001]\u0011\u0019\t\u0019\u0002\u0001C\u00019\"1\u0011q\u0003\u0001\u0005\u0002qCa!a\u0007\u0001\t\u0003a\u0006BBA\u0010\u0001\u0011\u0005A\f\u0003\u0004\u0002$\u0001!\t\u0001\u0018\u0005\u0007\u0003O\u0001A\u0011\u0001/\t\r\u0005-\u0002\u0001\"\u0001]\u0011\u0019\ty\u0003\u0001C\u00019\"9\u00111\u0007\u0001\u0005\u0002\u0005U\u0002bBA:\u0001\u0011\u0005\u0011Q\u000f\u0002\u001e%\u0016\u0004H.[2b\u00032$XM\u001d'pO\u0012K'o\u001d+ie\u0016\fG\rV3ti*\u00111\u0004H\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003u\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001AA\u0011\u0011\u0005J\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#\u0001\u0015\u0011\u0005%\u0002Q\"\u0001\u000e\u0002\tQ\f\u0004\u000fM\u000b\u0002YA\u0011Q&N\u0007\u0002])\u0011q\u0006M\u0001\u0007G>lWn\u001c8\u000b\u0005u\t$B\u0001\u001a4\u0003\u0019\t\u0007/Y2iK*\tA'A\u0002pe\u001eL!A\u000e\u0018\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006)A/\r91A\u0005!A/\r92\u0003\u0015!\u0018\u0007]\u0019!\u0003A1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7/F\u0001=!\tIS(\u0003\u0002?5\t\u0001b)Y5mK\u0012\u0004\u0016M\u001d;ji&|gn]\u0001\u0012M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N\u0004\u0013!E5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uKR\u0019!)\u0012&\u0011\u0005%\u001a\u0015B\u0001#\u001b\u0005EIe.\u001b;jC24U\r^2i'R\fG/\u001a\u0005\u0006\r\"\u0001\raR\u0001\fM\u0016$8\r[(gMN,G\u000f\u0005\u0002\"\u0011&\u0011\u0011J\t\u0002\u0005\u0019>tw\rC\u0004L\u0011A\u0005\t\u0019\u0001'\u0002\u00171,\u0017\rZ3s\u000bB|7\r\u001b\t\u0003C5K!A\u0014\u0012\u0003\u0007%sG/A\u000ej]&$\u0018.\u00197GKR\u001c\u0007n\u0015;bi\u0016$C-\u001a4bk2$HEM\u000b\u0002#*\u0012AJU\u0016\u0002'B\u0011A+W\u0007\u0002+*\u0011akV\u0001\nk:\u001c\u0007.Z2lK\u0012T!\u0001\u0017\u0012\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0002[+\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002YMDw.\u001e7e\u001d>$\u0018\t\u001a3QCJ$\u0018\u000e^5p]&3g)\u001e;ve\u0016dunZ%t\u001d>$H)\u001a4j]\u0016$G#A/\u0011\u0005\u0005r\u0016BA0#\u0005\u0011)f.\u001b;)\u0005)\t\u0007C\u00012j\u001b\u0005\u0019'B\u00013f\u0003\r\t\u0007/\u001b\u0006\u0003M\u001e\fqA[;qSR,'O\u0003\u0002ig\u0005)!.\u001e8ji&\u0011!n\u0019\u0002\u0005)\u0016\u001cH/\u0001\u0017tQ>,H\u000eZ+qI\u0006$X\rT3bI\u0016\u0014X\t]8dQ\u00063G/\u001a:GK:\u001cW\rZ#q_\u000eDWI\u001d:pe\"\u00121\"Y\u0001'g\"|W\u000f\u001c3SKBd\u0017mY3DkJ\u0014XM\u001c;M_\u001e$\u0015N],iK:\u001c\u0015-^4iiV\u0003\bF\u0001\u0007b\u0003]iwnY6GKR\u001c\u0007N\u0012:p[\u000e+(O]3oi2{w\rF\u0004^cNDX0!\u0002\t\u000bIl\u0001\u0019\u0001\u0017\u0002\u001dQ|\u0007/[2QCJ$\u0018\u000e^5p]\")A/\u0004a\u0001k\u0006Y!/Z9vKN$H)\u0019;b!\tIc/\u0003\u0002x5\t1\u0002+\u0019:uSRLwN\u001c$fi\u000eDW*\u001a;bI\u0006$\u0018\rC\u0003z\u001b\u0001\u0007!0\u0001\u0004d_:4\u0017n\u001a\t\u0003SmL!\u0001 \u000e\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0006}6\u0001\ra`\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s!\rI\u0013\u0011A\u0005\u0004\u0003\u0007Q\"A\u0004*fa2L7-Y'b]\u0006<WM\u001d\u0005\b\u0003\u000fi\u0001\u0019AA\u0005\u00031\u0011Xm\u001d9p]N,G)\u0019;b!\rI\u00131B\u0005\u0004\u0003\u001bQ\"A\u0005$fi\u000eD\u0007+\u0019:uSRLwN\u001c#bi\u0006\f!%[:tk\u0016\u001cX\t]8dQJ+\u0017/^3ti\u001a\u0013x.\u001c'pG\u0006d'+\u001a9mS\u000e\f\u0007F\u0001\bb\u0003u2W\r^2i\u000bB|7\r[:Ge>lG*Z1eKJ\u001c\u0006n\\;mI\"\u000bg\u000e\u001a7f\u000bb\u001cW\r\u001d;j_:4%o\\7HKRdunY1m%\u0016\u0004H.[2bQ\ty\u0011-A\u000ftQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_J+\u0007\u000f\\5dC>3gm]3uQ\t\u0001\u0012-A\u0017tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_\u0016sGm\u00144gg\u0016$xJ\u001a'be\u001e,7\u000f^\"p[6|g.\u00129pG\"D#!E1\u0002\u0003NDw.\u001e7e)J,hnY1uKR{\u0017J\\5uS\u0006dg)\u001a;dQ>3gm]3u\u0013\u001a\u0014V\r\u001d7jG\u0006\u0014V\r^;s]N,f\u000eZ3gS:,Gm\u00144gg\u0016$\bF\u0001\nb\u0003-\u001a\bn\\;mIB{G\u000e\\%oI\u00164\u0017N\\5uK2L\u0018J\u001a*fa2L7-\u0019(pi\u00063\u0018-\u001b7bE2,\u0007FA\nb\u0003\u0019\u001a\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ>sG.\u001f\u0015\u0003)\u0005\fAd\u001d5pk2$g)\u001a;dQ>sWMU3qY&\u001c\u0017-\u0011;B)&lW\r\u000b\u0002\u0016C\u0006i3\u000f[8vY\u00124U\r^2i\u001d>tG)\u001a7bs\u0016$\u0017I\u001c3O_:$&/\u001e8dCRLgn\u001a*fa2L7-Y:)\u0005Y\t\u0017\u0001B:uk\n$B\"a\u000e\u0002V\u0005\u0015\u0014\u0011NA7\u0003c\u0002b!!\u000f\u0002@\u0005\rSBAA\u001e\u0015\r\tidM\u0001\tK\u0006\u001c\u00180\\8dW&!\u0011\u0011IA\u001e\u0005MIU\t\u001f9fGR\fG/[8o'\u0016$H/\u001a:t!\u0015\t\u0013QIA%\u0013\r\t9E\t\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005-\u0013\u0011K\u0007\u0003\u0003\u001bR1!a\u0014\u001d\u0003\u001d\u0019G.^:uKJLA!a\u0015\u0002N\tI\u0001+\u0019:uSRLwN\u001c\u0005\b\u0003/:\u0002\u0019AA-\u0003\u001dawn\u001a+2aB\u0002B!a\u0017\u0002b5\u0011\u0011Q\f\u0006\u0004\u0003?b\u0012a\u00017pO&!\u00111MA/\u0005-\t%m\u001d;sC\u000e$Hj\\4\t\u000f\u0005\u001dt\u00031\u0001\u0002Z\u00059An\\4UcA\f\u0004bBA6/\u0001\u0007\u0011\u0011L\u0001\nMV$XO]3M_\u001eDq!a\u001c\u0018\u0001\u0004\tI%A\u0005qCJ$\u0018\u000e^5p]\")ap\u0006a\u0001\u007f\u0006)2\u000f^;c/&$\bNR3uG\"lUm]:bO\u0016\u001cHCDA<\u0003s\nY(! \u0002��\u0005\u0005\u00151\u0011\t\u0006\u0003s\ty$\u0018\u0005\b\u0003/B\u0002\u0019AA-\u0011\u001d\t9\u0007\u0007a\u0001\u00033Bq!a\u001b\u0019\u0001\u0004\tI\u0006C\u0004\u0002pa\u0001\r!!\u0013\t\u000byD\u0002\u0019A@\t\u000f\u0005\u0015\u0005\u00041\u0001\u0002\b\u0006\u0001\"/Z:q_:\u001cXmQ1mY\n\f7m\u001b\t\u0007\u0003s\tI)!$\n\t\u0005-\u00151\b\u0002\b\u0007\u0006\u0004H/\u001e:f!\u0019\t\u0013qRAJ;&\u0019\u0011\u0011\u0013\u0012\u0003\u0013\u0019+hn\u0019;j_:\f\u0004CBAK\u00037\u000by*\u0004\u0002\u0002\u0018*\u0019\u0011\u0011\u0014\u0012\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002\u001e\u0006]%aA*fcB1\u0011%!)-\u0003\u0013I1!a)#\u0005\u0019!V\u000f\u001d7fe\u0001")
/* loaded from: input_file:kafka/server/ReplicaAlterLogDirsThreadTest.class */
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private InitialFetchState initialFetchState(long j, int i) {
        return new InitialFetchState(new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @Test
    public void shouldNotAddPartitionIfFutureLogIsNotDefined() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))}))));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
    }

    @Test
    public void shouldUpdateLeaderEpochAfterFencedEpochError() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(t1p0(), new FullPartitionFetchMetadata(0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5 - 1)), Optional.empty()), fromProps, replicaManager, new FetchPartitionData(Errors.FENCED_LEADER_EPOCH, -1L, -1L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5 - 1))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(5)), replicaAlterLogDirsThread.fetchState(t1p0()).map(partitionFetchState -> {
            return BoxesRunTime.boxToInteger(partitionFetchState.currentLeaderEpoch());
        }));
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        mockFetchFromCurrentLog(t1p0(), new FullPartitionFetchMetadata(0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty()), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertFalse(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUp() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(t1p0(), new FullPartitionFetchMetadata(0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5)), Optional.empty()), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    private void mockFetchFromCurrentLog(final TopicPartition topicPartition, final PartitionFetchMetadata partitionFetchMetadata, KafkaConfig kafkaConfig, ReplicaManager replicaManager, FetchPartitionData fetchPartitionData) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Function1.class);
        final ReplicaAlterLogDirsThreadTest replicaAlterLogDirsThreadTest = null;
        replicaManager.fetchMessages(ArgumentMatchers.eq(0L), ArgumentMatchers.eq(Request$.MODULE$.FutureLocalReplicaId()), ArgumentMatchers.eq(0), BoxesRunTime.unboxToInt(ArgumentMatchers.eq(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(kafkaConfig.replicaFetchResponseMaxBytes())))), ArgumentMatchers.eq(false), (Seq) ArgumentMatchers.argThat(new ArgumentMatcher<Seq<Tuple2<TopicPartition, PartitionFetchMetadata>>>(replicaAlterLogDirsThreadTest, topicPartition, partitionFetchMetadata) { // from class: kafka.server.ReplicaAlterLogDirsThreadTest$$anon$1
            private final TopicPartition topicPartition$1;
            private final PartitionFetchMetadata requestData$1;

            public boolean matches(Seq<Tuple2<TopicPartition, PartitionFetchMetadata>> seq) {
                boolean z;
                boolean z2;
                Option unapply = package$.MODULE$.$plus$colon().unapply(seq);
                if (!unapply.isEmpty()) {
                    Tuple2 tuple2 = (Tuple2) ((Tuple2) unapply.get())._1();
                    Seq seq2 = (Seq) ((Tuple2) unapply.get())._2();
                    if (tuple2 != null) {
                        TopicPartition topicPartition2 = (TopicPartition) tuple2._1();
                        PartitionFetchMetadata partitionFetchMetadata2 = (PartitionFetchMetadata) tuple2._2();
                        Some unapplySeq = Seq$.MODULE$.unapplySeq(seq2);
                        if (!unapplySeq.isEmpty() && unapplySeq.get() != null && ((SeqLike) unapplySeq.get()).lengthCompare(0) == 0) {
                            TopicPartition topicPartition3 = this.topicPartition$1;
                            if (topicPartition2 != null ? topicPartition2.equals(topicPartition3) : topicPartition3 == null) {
                                if (partitionFetchMetadata2.fetchOffset() == this.requestData$1.fetchOffset()) {
                                    Optional currentLeaderEpoch = partitionFetchMetadata2.currentLeaderEpoch();
                                    Optional currentLeaderEpoch2 = this.requestData$1.currentLeaderEpoch();
                                    if (currentLeaderEpoch != null ? currentLeaderEpoch.equals(currentLeaderEpoch2) : currentLeaderEpoch2 == null) {
                                        Optional lastFetchedEpoch = partitionFetchMetadata2.lastFetchedEpoch();
                                        Optional lastFetchedEpoch2 = this.requestData$1.lastFetchedEpoch();
                                        if (lastFetchedEpoch != null ? lastFetchedEpoch.equals(lastFetchedEpoch2) : lastFetchedEpoch2 == null) {
                                            if (partitionFetchMetadata2.startOffset() == this.requestData$1.startOffset() && partitionFetchMetadata2.maxBytes() == this.requestData$1.maxBytes()) {
                                                z2 = true;
                                                z = z2;
                                                return z;
                                            }
                                        }
                                    }
                                }
                            }
                            z2 = false;
                            z = z2;
                            return z;
                        }
                    }
                }
                z = false;
                return z;
            }

            {
                this.topicPartition$1 = topicPartition;
                this.requestData$1 = partitionFetchMetadata;
            }
        }), (ReplicaQuota) ArgumentMatchers.eq(QuotaFactory$UnboundedQuota$.MODULE$), (Function1) forClass.capture(), (IsolationLevel) ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), (Option) ArgumentMatchers.eq(None$.MODULE$));
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$mockFetchFromCurrentLog$1(forClass, topicPartition, fetchPartitionData, invocationOnMock);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        Partition partition2 = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(1));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andStubReturn(partition2);
        EasyMock.expect(partition2.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232)).anyTimes();
        EasyMock.replay(new Object[]{partition, partition2, replicaManager});
        Assertions.assertEquals(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null).fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(5))}))), "results from leader epoch request should have offset from local replica");
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andThrow(new KafkaStorageException()).once();
        EasyMock.replay(new Object[]{partition, replicaManager});
        Assertions.assertEquals(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null).fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(2))}))));
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Capture newCapture2 = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog3 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog4 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        Partition partition2 = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> newCapture3 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(BoxesRunTime.boxToInteger(partition2.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(1));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andStubReturn(partition2);
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(abstractLog3);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p1())).andStubReturn(abstractLog4);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        partition2.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture2)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog3.logEndOffset())).andReturn(BoxesRunTime.boxToLong(191)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog4.logEndOffset())).andReturn(BoxesRunTime.boxToLong(191)).anyTimes();
        EasyMock.expect(abstractLog3.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(2))).anyTimes();
        EasyMock.expect(abstractLog3.endOffsetForEpoch(2)).andReturn(new Some(new OffsetAndEpoch(191, 2))).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(190)).anyTimes();
        EasyMock.expect(abstractLog4.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(2))).anyTimes();
        EasyMock.expect(abstractLog4.endOffsetForEpoch(2)).andReturn(new Some(new OffsetAndEpoch(191, 2))).anyTimes();
        EasyMock.expect(partition2.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(192)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(abstractLog, abstractLog2, abstractLog3, partition, replicaManager, newCapture3);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, partition2, abstractLog, abstractLog2, abstractLog3, abstractLog4});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(190, BoxesRunTime.unboxToLong(newCapture.getValue()));
        Assertions.assertEquals(191, BoxesRunTime.unboxToLong(newCapture2.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> newCapture2 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(abstractLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).andReturn(BoxesRunTime.boxToLong(195)).anyTimes();
        EasyMock.expect(abstractLog2.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(abstractLog2.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5 - 2))).times(3);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 1).setEndOffset(200)).anyTimes();
        EasyMock.expect(abstractLog2.endOffsetForEpoch(5 - 1)).andReturn(new Some(new OffsetAndEpoch(195, 5 - 2))).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5 - 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 2).setEndOffset(190)).anyTimes();
        EasyMock.expect(abstractLog2.endOffsetForEpoch(5 - 2)).andReturn(new Some(new OffsetAndEpoch(191, 5 - 2))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, newCapture2);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog, abstractLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(190)), new StringBuilder(48).append("Expected offset ").append(190).append(" in captured truncation offsets ").append(newCapture.getValues()).toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> newCapture2 = EasyMock.newCapture();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(abstractLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(abstractLog2.latestEpoch()).andReturn(None$.MODULE$).anyTimes();
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, newCapture2);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog, abstractLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(100, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(newCapture.getValue()), "Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET");
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture newCapture2 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(abstractLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).andReturn(BoxesRunTime.boxToLong(290)).anyTimes();
        EasyMock.expect(abstractLog2.latestEpoch()).andStubReturn(new Some(BoxesRunTime.boxToInteger(1)));
        EasyMock.expect(abstractLog2.endOffsetForEpoch(1)).andReturn(new Some(new OffsetAndEpoch(290, 1)));
        EasyMock.expect(replicaManager.localLog(t1p0())).andReturn(new Some(abstractLog)).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 1, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).times(3).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(1).setEndOffset(300));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean(EasyMock.anyObject()), (Seq) EasyMock.anyObject(), (ReplicaQuota) EasyMock.anyObject(), (Function1) EasyMock.capture(newCapture2), (IsolationLevel) EasyMock.anyObject(), (Option) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).andAnswer(() -> {
            $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(newCapture2);
            return BoxedUnit.UNIT;
        }).anyTimes();
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog, abstractLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        Assertions.assertEquals(0, newCapture.getValues().size());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(290, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> newCapture = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(213));
        partition.truncateTo(190, true);
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(abstractLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(abstractLog2.latestEpoch()).andStubReturn(new Some(BoxesRunTime.boxToInteger(5)));
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog2.logEndOffset())).andStubReturn(BoxesRunTime.boxToLong(190));
        EasyMock.expect(abstractLog2.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(190, 5)));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(abstractLog, null, abstractLog2, partition, replicaManager, newCapture);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog, abstractLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        EasyMock.verify(new Object[]{partition});
    }

    @Test
    public void shouldFetchOneReplicaAtATime() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(abstractLog, null, abstractLog2, partition, replicaManager);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaAlterLogDirsThread.buildFetch(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Map partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertFalse(fetchRequest.fetchData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        FetchRequest build = fetchRequest.build();
        Assertions.assertEquals(0, build.minBytes());
        Assertions.assertEquals(1, build.data().topics().size());
        FetchRequestData.FetchTopic fetchTopic = (FetchRequestData.FetchTopic) build.data().topics().get(0);
        Assertions.assertEquals(t1p0().topic(), fetchTopic.topic(), "Expected fetch request for first partition");
        Assertions.assertEquals(1, fetchTopic.partitions().size());
        Assertions.assertEquals(t1p0().partition(), ((FetchRequestData.FetchPartition) fetchTopic.partitions().get(0)).partition(), "Expected fetch request for first partition");
        Assertions.assertEquals(150L, ((FetchRequestData.FetchPartition) fetchTopic.partitions().get(0)).fetchOffset());
    }

    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog abstractLog2 = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog2.logStartOffset())).andReturn(BoxesRunTime.boxToLong(123)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(abstractLog, null, abstractLog2, partition, replicaManager);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog, abstractLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaAlterLogDirsThread.buildFetch(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), PartitionFetchState$.MODULE$.apply(150L, None$.MODULE$, 1, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), PartitionFetchState$.MODULE$.apply(160L, None$.MODULE$, 1, Truncating$.MODULE$, None$.MODULE$))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Map partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch = (AbstractFetcherThread.ReplicaFetch) option.get();
        Assertions.assertFalse(replicaFetch.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        FetchRequest build = replicaFetch.fetchRequest().build();
        Assertions.assertEquals(1, build.data().topics().size());
        FetchRequestData.FetchTopic fetchTopic = (FetchRequestData.FetchTopic) build.data().topics().get(0);
        Assertions.assertEquals(t1p0().topic(), fetchTopic.topic(), "Expected fetch request for non-truncating partition");
        Assertions.assertEquals(1, fetchTopic.partitions().size());
        Assertions.assertEquals(t1p0().partition(), ((FetchRequestData.FetchPartition) fetchTopic.partitions().get(0)).partition(), "Expected fetch request for non-truncating partition");
        Assertions.assertEquals(150L, ((FetchRequestData.FetchPartition) fetchTopic.partitions().get(0)).fetchOffset());
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaAlterLogDirsThread.buildFetch(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), PartitionFetchState$.MODULE$.apply(140L, None$.MODULE$, 1, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(160L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map partitionsWithError2 = buildFetch2.partitionsWithError();
        Assertions.assertTrue(option2.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch2 = (AbstractFetcherThread.ReplicaFetch) option2.get();
        Assertions.assertFalse(replicaFetch2.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError2.nonEmpty());
        FetchRequest build2 = replicaFetch2.fetchRequest().build();
        Assertions.assertEquals(1, build2.data().topics().size());
        FetchRequestData.FetchTopic fetchTopic2 = (FetchRequestData.FetchTopic) build2.data().topics().get(0);
        Assertions.assertEquals(t1p0().topic(), fetchTopic2.topic(), "Expected fetch request for non-delayed partition");
        Assertions.assertEquals(1, fetchTopic2.partitions().size());
        Assertions.assertEquals(t1p0().partition(), ((FetchRequestData.FetchPartition) fetchTopic2.partitions().get(0)).partition(), "Expected fetch request for non-delayed partition");
        Assertions.assertEquals(140L, ((FetchRequestData.FetchPartition) fetchTopic2.partitions().get(0)).fetchOffset());
        AbstractFetcherThread.ResultWithPartitions buildFetch3 = replicaAlterLogDirsThread.buildFetch(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(140L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(160L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch3 == null) {
            throw new MatchError((Object) null);
        }
        Option option3 = (Option) buildFetch3.result();
        Map partitionsWithError3 = buildFetch3.partitionsWithError();
        Assertions.assertTrue(option3.isEmpty(), "Expected no fetch requests since all partitions are delayed");
        Assertions.assertFalse(partitionsWithError3.nonEmpty());
    }

    public IExpectationSetters<Option<Partition>> stub(AbstractLog abstractLog, AbstractLog abstractLog2, AbstractLog abstractLog3, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect(replicaManager.localLog(t1p0())).andReturn(new Some(abstractLog)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p0())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andReturn(abstractLog3).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.onlinePartition(t1p0())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLog(t1p1())).andReturn(new Some(abstractLog2)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p1())).andReturn(abstractLog2).anyTimes();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p1())).andReturn(abstractLog3).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        return EasyMock.expect(replicaManager.onlinePartition(t1p1())).andReturn(new Some(partition)).anyTimes();
    }

    public IExpectationSetters<BoxedUnit> stubWithFetchMessages(AbstractLog abstractLog, AbstractLog abstractLog2, AbstractLog abstractLog3, Partition partition, ReplicaManager replicaManager, Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> capture) {
        stub(abstractLog, abstractLog2, abstractLog3, partition, replicaManager);
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean(EasyMock.anyObject()), (Seq) EasyMock.anyObject(), (ReplicaQuota) EasyMock.anyObject(), (Function1) EasyMock.capture(capture), (IsolationLevel) EasyMock.anyObject(), (Option) EasyMock.anyObject());
        return EasyMock.expect(BoxedUnit.UNIT).andAnswer(() -> {
            $anonfun$stubWithFetchMessages$1(capture);
            return BoxedUnit.UNIT;
        }).anyTimes();
    }

    public static final /* synthetic */ void $anonfun$mockFetchFromCurrentLog$1(ArgumentCaptor argumentCaptor, TopicPartition topicPartition, FetchPartitionData fetchPartitionData, InvocationOnMock invocationOnMock) {
        ((Function1) argumentCaptor.getValue()).apply(new $colon.colon(new Tuple2(topicPartition, fetchPartitionData), Nil$.MODULE$));
    }

    public static final /* synthetic */ void $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(Capture capture) {
        ((Function1) capture.getValue()).apply(Nil$.MODULE$);
    }

    public static final /* synthetic */ void $anonfun$stubWithFetchMessages$1(Capture capture) {
        ((Function1) capture.getValue()).apply(Nil$.MODULE$);
    }
}
