package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.Log;
import kafka.log.LogManager;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.SystemTime;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.List$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015e\u0001B\u0001\u0003\u0001\u001d\u0011\u0001DU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tIA\"D\u0001\u000b\u0015\u0005Y\u0011!B:dC2\f\u0017BA\u0007\u000b\u0005\u0019\te.\u001f*fM\")q\u0002\u0001C\u0001!\u00051A(\u001b8jiz\"\u0012!\u0005\t\u0003%\u0001i\u0011A\u0001\u0005\b)\u0001\u0011\r\u0011\"\u0003\u0016\u0003\u0011!\u0018\u0007\u001d\u0019\u0016\u0003Y\u0001\"aF\u0010\u000e\u0003aQ!!\u0007\u000e\u0002\r\r|W.\\8o\u0015\t)1D\u0003\u0002\u001d;\u00051\u0011\r]1dQ\u0016T\u0011AH\u0001\u0004_J<\u0017B\u0001\u0011\u0019\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:DaA\t\u0001!\u0002\u00131\u0012!\u0002;2aB\u0002\u0003b\u0002\u0013\u0001\u0005\u0004%I!F\u0001\u0005iF\u0002\u0018\u0007\u0003\u0004'\u0001\u0001\u0006IAF\u0001\u0006iF\u0002\u0018\u0007\t\u0005\bQ\u0001\u0011\r\u0011\"\u0003\u0016\u0003\u0011!(\u0007]\u0019\t\r)\u0002\u0001\u0015!\u0003\u0017\u0003\u0015!(\u0007]\u0019!\u0011\u001da\u0003A1A\u0005\n5\naB\u0019:pW\u0016\u0014XI\u001c3Q_&tG/F\u0001/!\ty#'D\u00011\u0015\t\tD!A\u0004dYV\u001cH/\u001a:\n\u0005M\u0002$A\u0004\"s_.,'/\u00128e!>Lg\u000e\u001e\u0005\u0007k\u0001\u0001\u000b\u0011\u0002\u0018\u0002\u001f\t\u0014xn[3s\u000b:$\u0007k\\5oi\u0002Bqa\u000e\u0001C\u0002\u0013%\u0001(\u0001\tgC&dW\r\u001a)beRLG/[8ogV\t\u0011\b\u0005\u0002\u0013u%\u00111H\u0001\u0002\u0011\r\u0006LG.\u001a3QCJ$\u0018\u000e^5p]NDa!\u0010\u0001!\u0002\u0013I\u0014!\u00054bS2,G\rU1si&$\u0018n\u001c8tA!)q\b\u0001C\u0005\u0001\u0006qqN\u001a4tKR\fe\u000eZ#q_\u000eDGcA!E\u0013B\u0011!CQ\u0005\u0003\u0007\n\u0011ab\u00144gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000eC\u0003F}\u0001\u0007a)A\u0006gKR\u001c\u0007n\u00144gg\u0016$\bCA\u0005H\u0013\tA%B\u0001\u0003M_:<\u0007b\u0002&?!\u0003\u0005\raS\u0001\fY\u0016\fG-\u001a:Fa>\u001c\u0007\u000e\u0005\u0002\n\u0019&\u0011QJ\u0003\u0002\u0004\u0013:$\b\"B(\u0001\t\u0003\u0001\u0016aB2mK\u0006tW\u000f\u001d\u000b\u0002#B\u0011\u0011BU\u0005\u0003'*\u0011A!\u00168ji\"\u0012a*\u0016\t\u0003-fk\u0011a\u0016\u0006\u00031v\tQA[;oSRL!AW,\u0003\u000b\u00053G/\u001a:\t\u000bq\u0003A\u0011\u0001)\u0002QMDw.\u001e7e'\u0016tG\rT1uKN$(+Z9vKN$h+\u001a:tS>t7OQ=EK\u001a\fW\u000f\u001c;)\u0005ms\u0006C\u0001,`\u0013\t\u0001wK\u0001\u0003UKN$\b\"\u00022\u0001\t\u0003\u0001\u0016\u0001Q:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195SKF,Xm\u001d;JM2\u000b7\u000f^#q_\u000eDG)\u001a4j]\u0016$gi\u001c:T_6,\u0007+\u0019:uSRLwN\\:)\u0005\u0005t\u0006\"B3\u0001\t\u00031\u0017!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u000b\u0006#\u001ed\u0017o\u001d\u0005\u0006Q\u0012\u0004\r![\u0001\bM\u0016$8\r[3s!\t\u0011\".\u0003\u0002l\u0005\t)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0007\"B7e\u0001\u0004q\u0017!F:i_VdGMQ3SK\u0006$\u0017PR8s\r\u0016$8\r\u001b\t\u0003\u0013=L!\u0001\u001d\u0006\u0003\u000f\t{w\u000e\\3b]\")!\u000f\u001aa\u0001]\u0006)2\u000f[8vY\u0012\u0014U\r\u0016:v]\u000e\fG/\u001b8h\u0019><\u0007\"\u0002;e\u0001\u0004q\u0017aD:i_VdGMQ3EK2\f\u00170\u001a3\t\u000bY\u0004A\u0011\u0001)\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007FA;_\u0011\u0015I\b\u0001\"\u0001Q\u0003y\u001a\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ>sG._%g\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00138po:$vNQ8uQ\"\u0012\u0001P\u0018\u0005\u0006y\u0002!\t\u0001U\u00015g\"|W\u000f\u001c3UeVt7-\u0019;f)>|eMZ:fiN\u0003XmY5gS\u0016$\u0017J\\#q_\u000eDwJ\u001a4tKR\u0014Vm\u001d9p]N,\u0007FA>_\u0011\u0015y\b\u0001\"\u0001Q\u00035\u001b\bn\\;mIR\u0013XO\\2bi\u0016$vn\u00144gg\u0016$8\u000b]3dS\u001aLW\rZ%o\u000bB|7\r[(gMN,GOU3ta>t7/Z%g\r>dGn\\<fe\"\u000b7OT8N_J,W\t]8dQND#A 0\t\r\u0005\u0015\u0001\u0001\"\u0001Q\u0003)\u001b\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eD7+Z2p]\u0012$\u0016.\\3JM2+\u0017\rZ3s%\u0016\u0004H.[3t/&$\b.\u00129pG\"tu\u000e^&o_^tGk\u001c$pY2|w/\u001a:)\u0007\u0005\ra\f\u0003\u0004\u0002\f\u0001!\t\u0001U\u00014g\"|W\u000f\u001c3Vg\u0016dU-\u00193fe\u0016sGm\u00144gg\u0016$\u0018JZ%oi\u0016\u0014(I]8lKJ4VM]:j_:\u0014U\r\\8xeAB3!!\u0003_\u0011\u0019\t\t\u0002\u0001C\u0001!\u0006\u00015\u000f[8vY\u0012$&/\u001e8dCR,Gk\\%oSRL\u0017\r\u001c$fi\u000eDwJ\u001a4tKRLe\rT3bI\u0016\u0014(+\u001a;ve:\u001cXK\u001c3fM&tW\rZ(gMN,G\u000fK\u0002\u0002\u0010yCa!a\u0006\u0001\t\u0003\u0001\u0016!M:i_VdG\rU8mY&sG-\u001a4j]&$X\r\\=JM2+\u0017\rZ3s%\u0016$XO\u001d8t\u0003:LX\t_2faRLwN\u001c\u0015\u0004\u0003+q\u0006BBA\u000f\u0001\u0011\u0005\u0001+A\u0016tQ>,H\u000eZ'pm\u0016\u0004\u0016M\u001d;ji&|gn](vi>3GK];oG\u0006$\u0018N\\4M_\u001e\u001cF/\u0019;fQ\r\tYB\u0018\u0005\u0007\u0003G\u0001A\u0011\u0001)\u0002qMDw.\u001e7e\r&dG/\u001a:QCJ$\u0018\u000e^5p]Nl\u0015\rZ3MK\u0006$WM\u001d#ve&tw\rT3bI\u0016\u0014X\t]8dQJ+\u0017/^3ti\"\u001a\u0011\u0011\u00050\t\r\u0005%\u0002\u0001\"\u0001Q\u0003!\u001b\bn\\;mI\u000e\u000bGo\u00195Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$w\u000b[3o'\",H\u000f^5oO\u0012{wO\u001c*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rK\u0002\u0002(yCa!a\f\u0001\t\u0003\u0001\u0016AJ:i_VdG-\u00169eCR,'+Z1tg&<g.\\3oi\nKH/Z:J]6+GO]5dg\"\u001a\u0011Q\u00060\t\r\u0005U\u0002\u0001\"\u0001Q\u0003\u0019\u001b\bn\\;mI:{G/\u00169eCR,'+Z1tg&<g.\\3oi\nKH/Z:J]6+GO]5dg^CWM\u001c(p%\u0016\f7o]5h]6,g\u000e^:J]B\u0013xn\u001a:fgND3!a\r_\u0011\u001d\tY\u0004\u0001C\u0005\u0003{\ta$Y:tKJ$\bK]8dKN\u001c\b+\u0019:uSRLwN\u001c#bi\u0006<\u0006.\u001a8\u0015\u0007E\u000by\u0004C\u0004\u0002B\u0005e\u0002\u0019\u00018\u0002\u001b%\u001c(+Z1tg&<g.\u001b8h\u0011\u001d\t)\u0005\u0001C\u0001\u0003\u000f\nAa\u001d;vER9\u0011+!\u0013\u0002T\u0005u\u0003\u0002CA&\u0003\u0007\u0002\r!!\u0014\u0002\u0013A\f'\u000f^5uS>t\u0007cA\u0018\u0002P%\u0019\u0011\u0011\u000b\u0019\u0003\u0013A\u000b'\u000f^5uS>t\u0007\u0002CA+\u0003\u0007\u0002\r!a\u0016\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feB\u0019!#!\u0017\n\u0007\u0005m#A\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\t\u0011\u0005}\u00131\ta\u0001\u0003C\n1\u0001\\8h!\u0011\t\u0019'a\u001a\u000e\u0005\u0005\u0015$bAA0\t%!\u0011\u0011NA3\u0005\raun\u001a\u0005\n\u0003[\u0002\u0011\u0013!C\u0005\u0003_\n\u0001d\u001c4gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000e\n3fM\u0006,H\u000e\u001e\u00133+\t\t\tHK\u0002L\u0003gZ#!!\u001e\u0011\t\u0005]\u0014\u0011Q\u0007\u0003\u0003sRA!a\u001f\u0002~\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003\u007fR\u0011AC1o]>$\u0018\r^5p]&!\u00111QA=\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private OffsetAndEpoch offsetAndEpoch(long j, int i) {
        return new OffsetAndEpoch(j, i);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @After
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{replicaManager});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, None$.MODULE$);
        Assert.assertEquals(ApiKeys.FETCH.latestVersion(), replicaFetcherThread.fetchRequestVersion());
        Assert.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), replicaFetcherThread.offsetForLeaderEpochRequestVersion());
        Assert.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), replicaFetcherThread.listOffsetRequestVersion());
    }

    @Test
    public void shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(log.latestEpoch()).andReturn(None$.MODULE$).once();
        EasyMock.expect(log.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(3);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        assertPartitionStates(replicaFetcherThread, false, true, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(3L, r0.fetchCount());
        assertPartitionStates(replicaFetcherThread, true, false, false);
        EasyMock.verify(new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0(), t1p1(), t2p1()})).foreach(new ReplicaFetcherThreadTest$$anonfun$assertPartitionStates$1(this, abstractFetcherThread, z, z2, z3));
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        EasyMock.expect(blockingSend.sendRequest((AbstractRequest.Builder) EasyMock.anyObject())).andThrow(new NullPointerException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        Assert.assertEquals("results from leader epoch request should have undefined offset", Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) null, new Some(blockingSend)).fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0))}))));
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(0L, 5))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.replay(new Object[]{replicaManager, logManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(3L, r0.fetchCount());
        EasyMock.verify(new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(new ReplicaFetcherThreadTest$$anonfun$1(this), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 1)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(200, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new EpochEndOffset(5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)));
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(new ReplicaFetcherThreadTest$$anonfun$2(this), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 3)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(4)).andReturn(None$.MODULE$).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new EpochEndOffset(4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)));
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t2p1()).append(" to truncate to offset ").append(BoxesRunTime.boxToInteger(200)).append(" (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(200)));
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(replicaFetcherMockBlockingSend));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(0L, replicaFetcherMockBlockingSend.fetchCount());
        replicaFetcherMockBlockingSend.setOffsetsForNextResponse((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(3, 102L))}))).asJava());
        replicaFetcherThread.doWork();
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(1L, replicaFetcherMockBlockingSend.fetchCount());
        Assert.assertEquals("OffsetsForLeaderEpochRequest version.", 3L, replicaFetcherMockBlockingSend.lastUsedOffsetForLeaderEpochVersion());
        replicaFetcherThread.doWork();
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.epochFetchCount());
        Assert.assertEquals(2L, replicaFetcherMockBlockingSend.fetchCount());
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(102)));
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(101)));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(200 - 2)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(3)).andReturn(new Some(new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(200)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(-1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(-1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        Assert.assertEquals("OffsetsForLeaderEpochRequest version.", 0L, r0.lastUsedOffsetForLeaderEpochVersion());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(155)));
        Assert.assertTrue(new StringBuilder().append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(newCapture.getValues()).append(")").toString(), ((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(143)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(new ReplicaFetcherThreadTest$$anonfun$3(this), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5)));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(-1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(100, offsetAndEpoch$default$2()))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(100, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(new ReplicaFetcherThreadTest$$anonfun$4(this), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(300, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(300)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        Map map = (Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), (KafkaConfig) seq.head(), failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(new ReplicaFetcherThreadTest$$anonfun$shouldPollIndefinitelyIfLeaderReturnsAnyException$1(this, replicaFetcherThread));
        Assert.assertEquals(0L, newCapture.getValues().size());
        map.put(t1p0(), new EpochEndOffset(5, 156L));
        replicaFetcherThread.doWork();
        Assert.assertEquals(156L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(0L, false);
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(0L)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(4))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(4)).andReturn(new Some(new OffsetAndEpoch(0L, 4))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        Assert.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), replicaFetcherThread.fetchState(t1p0()).map(new ReplicaFetcherThreadTest$$anonfun$shouldMovePartitionsOutOfTruncatingLogState$1(this)));
        Assert.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), replicaFetcherThread.fetchState(t1p1()).map(new ReplicaFetcherThreadTest$$anonfun$shouldMovePartitionsOutOfTruncatingLogState$2(this)));
        replicaFetcherThread.doWork();
        Assert.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), replicaFetcherThread.fetchState(t1p0()).map(new ReplicaFetcherThreadTest$$anonfun$shouldMovePartitionsOutOfTruncatingLogState$3(this)));
        Assert.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), replicaFetcherThread.fetchState(t1p1()).map(new ReplicaFetcherThreadTest$$anonfun$shouldMovePartitionsOutOfTruncatingLogState$4(this)));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(partition.localLogOrException()).andReturn(log).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.highWatermark())).andReturn(BoxesRunTime.boxToLong(100 - 2)).anyTimes();
        EasyMock.expect(log.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).anyTimes();
        EasyMock.expect(log.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(100, 5))).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(log.logEndOffset())).andReturn(BoxesRunTime.boxToLong(100)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.replicaAlterLogDirsManager()).andReturn(replicaAlterLogDirsManager).anyTimes();
        stub(partition, replicaManager, log);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, log});
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(replicaFetcherMockBlockingSend));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), offsetAndEpoch(0L, offsetAndEpoch$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), offsetAndEpoch(0L, offsetAndEpoch$default$2()))})));
        replicaFetcherMockBlockingSend.setEpochRequestCallback(new ReplicaFetcherThreadTest$$anonfun$shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest$1(this, replicaFetcherThread, t1p0()));
        replicaFetcherThread.doWork();
        Assert.assertEquals(49L, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        blockingSend.initiateClose();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalArgumentException()).once();
        blockingSend.close();
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new IllegalStateException()).once();
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay(new Object[]{blockingSend, replicaManager});
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) null, new Some(blockingSend));
        replicaFetcherThread.start();
        replicaFetcherThread.initiateShutdown();
        replicaFetcherThread.awaitShutdown();
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BlockingSend blockingSend = (BlockingSend) EasyMock.createNiceMock(BlockingSend.class);
        Log log = (Log) EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        EasyMock.expect(partition.localLogOrException()).andReturn(log);
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isReassigning())).andReturn(BoxesRunTime.boxToBoolean(z));
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).andReturn(BoxesRunTime.boxToBoolean(z));
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.nonOfflinePartition((TopicPartition) EasyMock.anyObject())).andReturn(new Some(partition));
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(brokerTopicStats).anyTimes();
        ReplicaQuota replicaQuota = (ReplicaQuota) EasyMock.createNiceMock(ReplicaQuota.class);
        EasyMock.replay(new Object[]{blockingSend, replicaManager, partition, log, replicaQuota});
        new ReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), replicaManager, new Metrics(), new SystemTime(), replicaQuota, new Some(blockingSend)).processPartitionData(t1p0(), 0L, new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})));
        if (z) {
            Assert.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assert.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assert.assertEquals(r0.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, Log log) {
        EasyMock.expect(replicaManager.localLogOrException(t1p0())).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t1p0())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p1())).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t1p1())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t2p1())).andReturn(log).anyTimes();
        EasyMock.expect(replicaManager.nonOfflinePartition(t2p1())).andReturn(new Some(partition)).anyTimes();
    }
}
