package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.SystemTime;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015e\u0001B\u0011#\u0001\u001dBQA\f\u0001\u0005\u0002=BqA\r\u0001C\u0002\u0013%1\u0007\u0003\u0004@\u0001\u0001\u0006I\u0001\u000e\u0005\b\u0001\u0002\u0011\r\u0011\"\u00034\u0011\u0019\t\u0005\u0001)A\u0005i!9!\t\u0001b\u0001\n\u0013\u0019\u0004BB\"\u0001A\u0003%A\u0007C\u0004E\u0001\t\u0007I\u0011B#\t\r1\u0003\u0001\u0015!\u0003G\u0011\u001di\u0005A1A\u0005\n9CaA\u0015\u0001!\u0002\u0013y\u0005\"B*\u0001\t\u0013!\u0006b\u00022\u0001#\u0003%Ia\u0019\u0005\u0006]\u0002!\ta\u001c\u0005\u0006u\u0002!\ta\u001c\u0005\u0006\u007f\u0002!\ta\u001c\u0005\b\u0003\u0007\u0001A\u0011AA\u0003\u0011\u0019\t\u0019\u0003\u0001C\u0001_\"1\u0011q\u0005\u0001\u0005\u0002=Da!a\u000b\u0001\t\u0003y\u0007BBA\u0018\u0001\u0011\u0005q\u000e\u0003\u0004\u00024\u0001!\ta\u001c\u0005\u0007\u0003o\u0001A\u0011A8\t\r\u0005m\u0002\u0001\"\u0001p\u0011\u0019\ty\u0004\u0001C\u0001_\"1\u00111\t\u0001\u0005\u0002=Da!a\u0012\u0001\t\u0003y\u0007BBA&\u0001\u0011\u0005q\u000e\u0003\u0004\u0002P\u0001!\ta\u001c\u0005\u0007\u0003'\u0002A\u0011A8\t\u000f\u0005]\u0003\u0001\"\u0003\u0002Z!9\u0011q\f\u0001\u0005\u0002\u0005\u0005$\u0001\u0007*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rV3ti*\u00111\u0005J\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0015\nQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001QA\u0011\u0011\u0006L\u0007\u0002U)\t1&A\u0003tG\u0006d\u0017-\u0003\u0002.U\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#\u0001\u0019\u0011\u0005E\u0002Q\"\u0001\u0012\u0002\tQ\f\u0004\u000fM\u000b\u0002iA\u0011Q'P\u0007\u0002m)\u0011q\u0007O\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015J$B\u0001\u001e<\u0003\u0019\t\u0007/Y2iK*\tA(A\u0002pe\u001eL!A\u0010\u001c\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006)A/\r91A\u0005!A/\r92\u0003\u0015!\u0018\u0007]\u0019!\u0003\u0011!(\u0007]\u0019\u0002\u000bQ\u0014\u0004/\r\u0011\u0002\u001d\t\u0014xn[3s\u000b:$\u0007k\\5oiV\ta\t\u0005\u0002H\u00156\t\u0001J\u0003\u0002JI\u000591\r\\;ti\u0016\u0014\u0018BA&I\u00059\u0011%o\\6fe\u0016sG\rU8j]R\fqB\u0019:pW\u0016\u0014XI\u001c3Q_&tG\u000fI\u0001\u0011M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N,\u0012a\u0014\t\u0003cAK!!\u0015\u0012\u0003!\u0019\u000b\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\u0018!\u00054bS2,G\rU1si&$\u0018n\u001c8tA\u0005qqN\u001a4tKR\fe\u000eZ#q_\u000eDGcA+Y;B\u0011\u0011GV\u0005\u0003/\n\u0012ab\u00144gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000eC\u0003Z\u0019\u0001\u0007!,A\u0006gKR\u001c\u0007n\u00144gg\u0016$\bCA\u0015\\\u0013\ta&F\u0001\u0003M_:<\u0007b\u00020\r!\u0003\u0005\raX\u0001\fY\u0016\fG-\u001a:Fa>\u001c\u0007\u000e\u0005\u0002*A&\u0011\u0011M\u000b\u0002\u0004\u0013:$\u0018\u0001G8gMN,G/\u00118e\u000bB|7\r\u001b\u0013eK\u001a\fW\u000f\u001c;%eU\tAM\u000b\u0002`K.\na\r\u0005\u0002hY6\t\u0001N\u0003\u0002jU\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0003W*\n!\"\u00198o_R\fG/[8o\u0013\ti\u0007NA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fqa\u00197fC:,\b\u000fF\u0001q!\tI\u0013/\u0003\u0002sU\t!QK\\5uQ\tqA\u000f\u0005\u0002vq6\taO\u0003\u0002xw\u0005)!.\u001e8ji&\u0011\u0011P\u001e\u0002\u0006\u0003\u001a$XM]\u0001)g\"|W\u000f\u001c3TK:$G*\u0019;fgR\u0014V-];fgR4VM]:j_:\u001c()\u001f#fM\u0006,H\u000e\u001e\u0015\u0003\u001fq\u0004\"!^?\n\u0005y4(\u0001\u0002+fgR\f\u0001i\u001d5pk2$g)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^%g\u0019\u0006\u001cH/\u00129pG\"$UMZ5oK\u00124uN]*p[\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0003!q\fQ#Y:tKJ$\b+\u0019:uSRLwN\\*uCR,7\u000fF\u0005q\u0003\u000f\t\t\"a\u0007\u0002 !9\u0011\u0011B\tA\u0002\u0005-\u0011a\u00024fi\u000eDWM\u001d\t\u0004c\u00055\u0011bAA\bE\t)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0007bBA\n#\u0001\u0007\u0011QC\u0001\u0016g\"|W\u000f\u001c3CKJ+\u0017\rZ=G_J4U\r^2i!\rI\u0013qC\u0005\u0004\u00033Q#a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003;\t\u0002\u0019AA\u000b\u0003U\u0019\bn\\;mI\n+GK];oG\u0006$\u0018N\\4M_\u001eDq!!\t\u0012\u0001\u0004\t)\"A\btQ>,H\u000e\u001a\"f\t\u0016d\u0017-_3e\u0003\u0015\u001a\bn\\;mI\"\u000bg\u000e\u001a7f\u000bb\u001cW\r\u001d;j_:4%o\\7CY>\u001c7.\u001b8h'\u0016tG\r\u000b\u0002\u0013y\u0006q4\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"|e\u000e\\=JM2+\u0017\rZ3s\u000bB|7\r[&o_^tGk\u001c\"pi\"D#a\u0005?\u0002iMDw.\u001e7e)J,hnY1uKR{wJ\u001a4tKR\u001c\u0006/Z2jM&,G-\u00138Fa>\u001c\u0007n\u00144gg\u0016$(+Z:q_:\u001cX\r\u000b\u0002\u0015y\u0006i5\u000f[8vY\u0012$&/\u001e8dCR,Gk\\(gMN,Go\u00159fG&4\u0017.\u001a3J]\u0016\u0003xn\u00195PM\u001a\u001cX\r\u001e*fgB|gn]3JM\u001a{G\u000e\\8xKJD\u0015m\u001d(p\u001b>\u0014X-\u00129pG\"\u001c\bFA\u000b}\u0003)\u001b\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eD7+Z2p]\u0012$\u0016.\\3JM2+\u0017\rZ3s%\u0016\u0004H.[3t/&$\b.\u00129pG\"tu\u000e^&o_^tGk\u001c$pY2|w/\u001a:)\u0005Ya\u0018aM:i_VdG-V:f\u0019\u0016\fG-\u001a:F]\u0012|eMZ:fi&3\u0017J\u001c;fe\n\u0013xn[3s-\u0016\u00148/[8o\u0005\u0016dwn\u001e\u001a1Q\t9B0\u0001!tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_&s\u0017\u000e^5bY\u001a+Go\u00195PM\u001a\u001cX\r^%g\u0019\u0016\fG-\u001a:SKR,(O\\:V]\u0012,g-\u001b8fI>3gm]3uQ\tAB0A\u0019tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001adU-\u00193feJ+G/\u001e:og\u0006s\u00170\u0012=dKB$\u0018n\u001c8)\u0005ea\u0018aK:i_VdG-T8wKB\u000b'\u000f^5uS>t7oT;u\u001f\u001a$&/\u001e8dCRLgn\u001a'pON#\u0018\r^3)\u0005ia\u0018\u0001O:i_VdGMR5mi\u0016\u0014\b+\u0019:uSRLwN\\:NC\u0012,G*Z1eKJ$UO]5oO2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\u0015\u00037q\f\u0001j\u001d5pk2$7)\u0019;dQ\u0016C8-\u001a9uS>tgI]8n\u00052|7m[5oON+g\u000eZ,iK:\u001c\u0006.\u001e;uS:<Gi\\<o%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193)\u0005qa\u0018AJ:i_VdG-\u00169eCR,'+Z1tg&<g.\\3oi\nKH/Z:J]6+GO]5dg\"\u0012Q\u0004`\u0001Gg\"|W\u000f\u001c3O_R,\u0006\u000fZ1uKJ+\u0017m]:jO:lWM\u001c;CsR,7/\u00138NKR\u0014\u0018nY:XQ\u0016tgj\u001c*fCN\u001c\u0018n\u001a8nK:$8/\u00138Qe><'/Z:tQ\tqB0\u0001\u0010bgN,'\u000f\u001e)s_\u000e,7o\u001d)beRLG/[8o\t\u0006$\u0018m\u00165f]R\u0019\u0001/a\u0017\t\u000f\u0005us\u00041\u0001\u0002\u0016\u0005i\u0011n\u001d*fCN\u001c\u0018n\u001a8j]\u001e\fAa\u001d;vER9\u0001/a\u0019\u0002n\u0005]\u0004bBA3A\u0001\u0007\u0011qM\u0001\na\u0006\u0014H/\u001b;j_:\u00042aRA5\u0013\r\tY\u0007\u0013\u0002\n!\u0006\u0014H/\u001b;j_:Dq!a\u001c!\u0001\u0004\t\t(\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0011\u0007E\n\u0019(C\u0002\u0002v\t\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'\u000fC\u0004\u0002z\u0001\u0002\r!a\u001f\u0002\u00071|w\r\u0005\u0003\u0002~\u0005\u0005UBAA@\u0015\r\tI\bJ\u0005\u0005\u0003\u0007\u000byHA\u0006BEN$(/Y2u\u0019><\u0007")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private OffsetAndEpoch offsetAndEpoch(long j, int i) {
        return new OffsetAndEpoch(j, i);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @After
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{replicaManager});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, None$.MODULE$);
        Assert.assertEquals(ApiKeys.FETCH.latestVersion(), replicaFetcherThread.fetchRequestVersion());
        Assert.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), replicaFetcherThread.offsetForLeaderEpochRequestVersion());
        Assert.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), replicaFetcherThread.listOffsetRequestVersion());
    }

    @Test
    public void shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(None$.MODULE$).once();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(3);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(5, 1L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(5, 1L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[3];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        Predef$ArrowAssoc$ predef$ArrowAssoc$5 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc5 = Predef$.MODULE$.ArrowAssoc(t2p1());
        OffsetAndEpoch offsetAndEpoch3 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$5 == null) {
            throw null;
        }
        tuple2Arr2[2] = new Tuple2(ArrowAssoc5, offsetAndEpoch3);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        assertPartitionStates(replicaFetcherThread, false, true, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(3L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        EasyMock.verify(new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        List colonVar = new $colon.colon(t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$)));
        while (true) {
            List list = colonVar;
            if (list.isEmpty()) {
                return;
            }
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, (TopicPartition) list.head());
            colonVar = (List) list.tail();
        }
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        EasyMock.expect(blockingSend.sendRequest((AbstractRequest.Builder) EasyMock.anyObject())).andThrow(new NullPointerException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) null, None$.MODULE$, new Some(blockingSend));
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetsForLeaderEpochRequest.PartitionData partitionData = new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, partitionData);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetsForLeaderEpochRequest.PartitionData partitionData2 = new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, partitionData2);
        scala.collection.Map fetchEpochEndOffsets = replicaFetcherThread.fetchEpochEndOffsets(map$.apply(predef$.wrapRefArray(tuple2Arr)));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L);
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L);
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, epochEndOffset2);
        Assert.assertEquals("results from leader epoch request should have undefined offset", map$2.apply(predef$2.wrapRefArray(tuple2Arr2)), fetchEpochEndOffsets);
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.replay(new Object[]{replicaManager, logManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(5, 1L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(5, 1L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(3L, r0.fetchCount());
        EasyMock.verify(new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 1)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(200, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(5, 156L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t2p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(5, 172L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t2p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)));
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 3)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(None$.MODULE$).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(4, 156L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t2p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(4, 202L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t2p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)));
        Assert.assertTrue(new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(200)));
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(4, 155L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(4, 143L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(0L, replicaFetcherMockBlockingSend.fetchCount());
        JavaConverters$ javaConverters$2 = JavaConverters$.MODULE$;
        Map$ map$3 = Map$.MODULE$;
        Predef$ predef$3 = Predef$.MODULE$;
        Tuple2[] tuple2Arr3 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$5 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc5 = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset3 = new EpochEndOffset(3, 101L);
        if (predef$ArrowAssoc$5 == null) {
            throw null;
        }
        tuple2Arr3[0] = new Tuple2(ArrowAssoc5, epochEndOffset3);
        Predef$ArrowAssoc$ predef$ArrowAssoc$6 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc6 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset4 = new EpochEndOffset(3, 102L);
        if (predef$ArrowAssoc$6 == null) {
            throw null;
        }
        tuple2Arr3[1] = new Tuple2(ArrowAssoc6, epochEndOffset4);
        replicaFetcherMockBlockingSend.setOffsetsForNextResponse((Map) javaConverters$2.mapAsJavaMapConverter(map$3.apply(predef$3.wrapRefArray(tuple2Arr3))).asJava());
        replicaFetcherThread.doWork();
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(1L, replicaFetcherMockBlockingSend.fetchCount());
        Assert.assertEquals("OffsetsForLeaderEpochRequest version.", 3L, replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion());
        replicaFetcherThread.doWork();
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.fetchCount());
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(102)));
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(101)));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(-1, 155L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(-1, 143L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        Assert.assertEquals("OffsetsForLeaderEpochRequest version.", 0L, r0.lastUsedOffsetForLeaderEpochVersion());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(155)));
        Assert.assertTrue(new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(143)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5)));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[1];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(-1, -1L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[1];
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(100, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc2, offsetAndEpoch);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        replicaFetcherThread.doWork();
        Assert.assertEquals(100, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(300, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(300)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        scala.collection.mutable.Map$ map$ = scala.collection.mutable.Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1, -1L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        Map map = (Map) javaConverters$.mutableMapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava();
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        RichInt$ richInt$ = RichInt$.MODULE$;
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Range.Inclusive inclusive = richInt$.to$extension0(0, 3);
        if (inclusive == null) {
            throw null;
        }
        if (!inclusive.isEmpty()) {
            int start = inclusive.start();
            while (true) {
                int i = start;
                replicaFetcherThread.doWork();
                if (i == inclusive.scala$collection$immutable$Range$$lastElement()) {
                    break;
                } else {
                    start = i + inclusive.step();
                }
            }
        }
        Assert.assertEquals(0L, newCapture.getValues().size());
        map.put(t1p0(), new EpochEndOffset(5, 156L));
        replicaFetcherThread.doWork();
        Assert.assertEquals(156L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(0L, false);
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(4))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(0L, 4))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(4, 1L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(4, 1L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime())));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        Option apply = Option$.MODULE$.apply(Truncating$.MODULE$);
        Option fetchState = replicaFetcherThread.fetchState(t1p0());
        if (fetchState == null) {
            throw null;
        }
        Assert.assertEquals(apply, fetchState.isEmpty() ? None$.MODULE$ : new Some(((PartitionFetchState) fetchState.get()).state()));
        Option apply2 = Option$.MODULE$.apply(Truncating$.MODULE$);
        Option fetchState2 = replicaFetcherThread.fetchState(t1p1());
        if (fetchState2 == null) {
            throw null;
        }
        Assert.assertEquals(apply2, fetchState2.isEmpty() ? None$.MODULE$ : new Some(((PartitionFetchState) fetchState2.get()).state()));
        replicaFetcherThread.doWork();
        Option apply3 = Option$.MODULE$.apply(Fetching$.MODULE$);
        Option fetchState3 = replicaFetcherThread.fetchState(t1p0());
        if (fetchState3 == null) {
            throw null;
        }
        Assert.assertEquals(apply3, fetchState3.isEmpty() ? None$.MODULE$ : new Some(((PartitionFetchState) fetchState3.get()).state()));
        Option apply4 = Option$.MODULE$.apply(Fetching$.MODULE$);
        Option fetchState4 = replicaFetcherThread.fetchState(t1p1());
        if (fetchState4 == null) {
            throw null;
        }
        Assert.assertEquals(apply4, fetchState4.isEmpty() ? None$.MODULE$ : new Some(((PartitionFetchState) fetchState4.get()).state()));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.highWatermark())).andReturn(BoxesRunTime.boxToLong(100 - 2)).anyTimes();
        EasyMock.expect(abstractLog.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(abstractLog.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(100, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        stub(partition, replicaManager, abstractLog);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, abstractLog});
        JavaConverters$ javaConverters$ = JavaConverters$.MODULE$;
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(t1p0());
        EpochEndOffset epochEndOffset = new EpochEndOffset(5, 52L);
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, epochEndOffset);
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        EpochEndOffset epochEndOffset2 = new EpochEndOffset(5, 49L);
        if (predef$ArrowAssoc$2 == null) {
            throw null;
        }
        tuple2Arr[1] = new Tuple2(ArrowAssoc2, epochEndOffset2);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((Map) javaConverters$.mapAsJavaMapConverter(map$.apply(predef$.wrapRefArray(tuple2Arr))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, None$.MODULE$, new Some(replicaFetcherMockBlockingSend));
        Map$ map$2 = Map$.MODULE$;
        Predef$ predef$2 = Predef$.MODULE$;
        Tuple2[] tuple2Arr2 = new Tuple2[2];
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t1p0());
        OffsetAndEpoch offsetAndEpoch = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$3 == null) {
            throw null;
        }
        tuple2Arr2[0] = new Tuple2(ArrowAssoc3, offsetAndEpoch);
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        OffsetAndEpoch offsetAndEpoch2 = offsetAndEpoch(0L, offsetAndEpoch$default$2());
        if (predef$ArrowAssoc$4 == null) {
            throw null;
        }
        tuple2Arr2[1] = new Tuple2(ArrowAssoc4, offsetAndEpoch2);
        replicaFetcherThread.addPartitions(map$2.apply(predef$2.wrapRefArray(tuple2Arr2)));
        TopicPartition t1p0 = t1p0();
        replicaFetcherMockBlockingSend.setEpochRequestCallback(() -> {
            replicaFetcherThread.removePartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0})));
        });
        replicaFetcherThread.doWork();
        Assert.assertEquals(49L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        blockingSend.initiateClose();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalArgumentException()).once();
        blockingSend.close();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalStateException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) null, None$.MODULE$, new Some(blockingSend));
        replicaFetcherThread.start();
        replicaFetcherThread.initiateShutdown();
        replicaFetcherThread.awaitShutdown();
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createNiceMock(BlockingSend.class);
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(abstractLog);
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isReassigning())).andReturn(BoxesRunTime.boxToBoolean(z));
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).andReturn(BoxesRunTime.boxToBoolean(z));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.nonOfflinePartition((TopicPartition) EasyMock.anyObject())).andReturn(new Some(partition));
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(brokerTopicStats).anyTimes();
        ReplicaQuota replicaQuota = (ReplicaQuota) EasyMock.createNiceMock(ReplicaQuota.class);
        EasyMock.replay(new Object[]{blockingSend, replicaManager, partition, abstractLog, replicaQuota});
        new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicaQuota, None$.MODULE$, new Some(blockingSend)).processPartitionData(t1p0(), 0L, new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})));
        if (z) {
            Assert.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assert.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assert.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog abstractLog) {
        EasyMock.expect(replicaManager.localLogOrException(t1p0())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t1p0())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p1())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t1p1())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t2p1())).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t2p1())).andReturn(new Some(partition)).anyTimes();
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assert.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assert.assertEquals(new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString(), BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()));
        Assert.assertEquals(new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString(), BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()));
        Assert.assertEquals(new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString(), BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()));
    }

    public static final /* synthetic */ Object $anonfun$assertPartitionStates$1$adapted(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
        return BoxedUnit.UNIT;
    }
}
