package kafka.cluster;

import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.ReplicationState;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.Defaults$;
import kafka.server.LogDeleteRecordsResult;
import kafka.server.RequestLocal$;
import kafka.server.link.TopicLinkMirror$;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedReplicationSessionIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.PushReplicationStartedException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadInfo;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.MapLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: PushReplicationPartitionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\rf\u0001B\u001a5\u0001eBQA\u0010\u0001\u0005\u0002}BQ!\u0011\u0001\u0005B\tCq!\u0017\u0001C\u0002\u0013\u0005#\f\u0003\u0004d\u0001\u0001\u0006Ia\u0017\u0005\u0006I\u0002!\t!\u001a\u0005\u0006m\u0002!\t!\u001a\u0005\u0006q\u0002!\t!\u001a\u0005\u0006u\u0002!\t!\u001a\u0005\u0006y\u0002!\t!\u001a\u0005\u0006}\u0002!\t!\u001a\u0005\u0007\u0003\u0003\u0001A\u0011A3\t\u000f\u0005\u0015\u0001\u0001\"\u0001\u0002\b!1\u0011q\u0007\u0001\u0005\u0002\u0015Da!a\u000f\u0001\t\u0003)\u0007BBA \u0001\u0011\u0005Q\r\u0003\u0004\u0002D\u0001!\t!\u001a\u0005\u0007\u0003\u000f\u0002A\u0011A3\t\u000f\u0005-\u0003\u0001\"\u0001\u0002N!1\u0011q\u0010\u0001\u0005\u0002\u0015Da!a!\u0001\t\u0003)\u0007BBAD\u0001\u0011\u0005Q\r\u0003\u0004\u0002\f\u0002!\t!\u001a\u0005\u0007\u0003\u001f\u0003A\u0011A3\t\r\u0005M\u0005\u0001\"\u0001f\u0011\u0019\t9\n\u0001C\u0001K\"1\u00111\u0014\u0001\u0005\u0002\u0015Da!a(\u0001\t\u0003)\u0007BBAR\u0001\u0011\u0005Q\r\u0003\u0004\u0002(\u0002!\t!\u001a\u0005\u0007\u0003W\u0003A\u0011A3\t\r\u0005=\u0006\u0001\"\u0001f\u0011\u0019\t\u0019\f\u0001C\u0001K\"1\u0011q\u0017\u0001\u0005\u0002\u0015Da!a/\u0001\t\u0003)\u0007BBA`\u0001\u0011\u0005Q\r\u0003\u0004\u0002D\u0002!\t!\u001a\u0005\u0007\u0003\u000f\u0004A\u0011A3\t\u000f\u0005-\u0007\u0001\"\u0003\u0002N\"1\u0011Q\u001c\u0001\u0005\u0002\u0015Da!!9\u0001\t\u0003)\u0007BBAs\u0001\u0011\u0005Q\r\u0003\u0004\u0002j\u0002!\t!\u001a\u0005\u0007\u0003[\u0004A\u0011A3\t\u000f\u0005E\b\u0001\"\u0003\u0002t\"9!Q\u0001\u0001\u0005\n\t\u001d\u0001b\u0002B\u0016\u0001\u0011%!Q\u0006\u0005\b\u0005#\u0002A\u0011\u0002B*\u0011\u001d\u0011i\u0006\u0001C\u0005\u0005?BqA!#\u0001\t\u0013\u0011Y\tC\u0004\u0003\u0018\u0002!IA!'\u00039A+8\u000f\u001b*fa2L7-\u0019;j_:\u0004\u0016M\u001d;ji&|g\u000eV3ti*\u0011QGN\u0001\bG2,8\u000f^3s\u0015\u00059\u0014!B6bM.\f7\u0001A\n\u0003\u0001i\u0002\"a\u000f\u001f\u000e\u0003QJ!!\u0010\u001b\u0003+\u0005\u00137\u000f\u001e:bGR\u0004\u0016M\u001d;ji&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012\u0001\u0011\t\u0003w\u0001\tAd\u0019:fCR,\u0007+^:i%\u0016\u0004H.[2bi&|g.T1oC\u001e,'/F\u0001D!\r!u)S\u0007\u0002\u000b*\ta)A\u0003tG\u0006d\u0017-\u0003\u0002I\u000b\n1q\n\u001d;j_:\u0004\"A\u0013,\u000f\u0005-\u001bfB\u0001'R\u001d\ti\u0005+D\u0001O\u0015\ty\u0005(\u0001\u0004=e>|GOP\u0005\u0002o%\u0011!KN\u0001\u0006kRLGn]\u0005\u0003)V\u000b\u0011\u0002V3tiV#\u0018\u000e\\:\u000b\u0005I3\u0014BA,Y\u0005=iunY6QkNDW*\u00198bO\u0016\u0014(B\u0001+V\u00035iW\r^1eCR\f7)Y2iKV\t1\f\u0005\u0002]C6\tQL\u0003\u0002_?\u0006AQ.\u001a;bI\u0006$\u0018M\u0003\u0002am\u000511/\u001a:wKJL!AY/\u0003%-\u0013\u0016M\u001a;NKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u0001\u000f[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3!\u0003Y\"Xm\u001d;NC.,G*Z1eKJ\u0014Vm]3ugJ+\u0007\u000f\\5dCRLwN\\*fgNLwN\\(o\u001d\u0016<H*Z1eKJ,\u0005o\\2i)\u00051\u0007C\u0001#h\u0013\tAWI\u0001\u0003V]&$\bFA\u0003k!\tYG/D\u0001m\u0015\tig.A\u0002ba&T!a\u001c9\u0002\u000f),\b/\u001b;fe*\u0011\u0011O]\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002g\u0006\u0019qN]4\n\u0005Ud'\u0001\u0002+fgR\f\u0011\u0007^3ti6\u000b7.\u001a'fC\u0012,'/\u00128egB+8\u000f[*fgNLwN\\:XQ\u0016tg)\u001a8dS:<'+\u001a9mS\u000e\f7\u000f\u000b\u0002\u0007U\u0006\tD/Z:u\u001b\u0006\\W\rT3bI\u0016\u0014XI\u001c3t!V\u001c\bnU3tg&|gn\u00165f]J+\u0007\u000f\\5dCN\u001c\u0006.\u001e;E_^t\u0007FA\u0004k\u0003Q\"Xm\u001d;NC.,G*Z1eKJ,e\u000eZ:QkND7+Z:tS>tw\u000b[3o!J|Wn\u001c;j]\u001ed\u0015N\\6MK\u0006$WM\u001d\u0015\u0003\u0011)\f!\u0006^3ti\u0006cG/\u001a:QCJ$\u0018\u000e^5p]&\u001b(o\u00155sS:\\WI\u001c3t!V\u001c\bnU3tg&|g\u000e\u000b\u0002\nU\u0006\u0019C/Z:u\t\u0016dW\r^3QCJ$\u0018\u000e^5p]\u0016sGm\u001d)vg\"\u001cVm]:j_:\u001c\bF\u0001\u0006k\u0003!\"Xm\u001d;NCJ\\\u0007+\u0019:uSRLwN\\(gM2Lg.Z#oIN\u0004Vo\u001d5TKN\u001c\u0018n\u001c8tQ\tY!.A\u0015uKN$8i\u001c8dkJ\u0014XM\u001c;NC.,G*Z1eKJ<\u0016\u000e\u001e5DCV<\u0007\u000e^+q\r\u0016$8\r\u001b\u000b\u0004M\u0006%\u0001bBA\u0006\u0019\u0001\u0007\u0011QB\u0001\u000b[\u0006\\W\rT3bI\u0016\u0014\bc\u0001#\u0002\u0010%\u0019\u0011\u0011C#\u0003\u000f\t{w\u000e\\3b]\":A\"!\u0006\u0002&\u0005\u001d\u0002\u0003BA\f\u0003Ci!!!\u0007\u000b\t\u0005m\u0011QD\u0001\taJ|g/\u001b3fe*\u0019\u0011q\u00048\u0002\rA\f'/Y7t\u0013\u0011\t\u0019#!\u0007\u0003\u0017Y\u000bG.^3T_V\u00148-Z\u0001\tE>|G.Z1og2\"\u0011\u0011FA\u00163\u0005\u0001\u0011$A\u0001)\u00071\ty\u0003\u0005\u0003\u00022\u0005MRBAA\u000f\u0013\u0011\t)$!\b\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0014uKN$X*Y6f\r>dGn\\<fe\u0016sGm\u001d*fa2L7-\u0019;j_:\u001cVm]:j_:D#!\u00046\u0002eQ,7\u000f^\"p]\u000e,(O]3oi\u000e\u000bWo\u001a5u+B4U\r^2i\u0003:$g*Z<SKBd\u0017nY1Fa>\u001c\u0007NR3uG\"D#A\u00046\u0002iQ,7\u000f^\"p]\u000e,(O]3oi\u000e\u000bWo\u001a5u+B4U\r^2i\u0003:$g*Z<SKBd\u0017nY1TKN\u001c\u0018n\u001c8GKR\u001c\u0007\u000e\u000b\u0002\u0010U\u0006YC/Z:u\r\u0016$8\r\u001b+sC:\u001c\u0018\u000e^5p]N$v\u000eU;tQ^CWM\u001c$vY2L8)Y;hQR,\u0006\u000f\u000b\u0002\u0011U\u0006\u0001D/Z:u\r\u0016$8\r\u001b+sC:\u001c\u0018\u000e^5p]N$v\u000eU;tQ>sG._,ji\"$UMZ5oK\u0012$v\u000e]5d\u0013\u0012D#!\u00056\u0002]Q,7\u000f\u001e$fi\u000eDGi\\3t\u001d>$HK]1og&$\u0018n\u001c8J]R,'O\\1m)>\u0004\u0018nY:U_B+8\u000f\u001b\u000b\u0004M\u0006=\u0003bBA)%\u0001\u0007\u00111K\u0001\u0006i>\u0004\u0018n\u0019\t\u0005\u0003+\niF\u0004\u0003\u0002X\u0005e\u0003CA'F\u0013\r\tY&R\u0001\u0007!J,G-\u001a4\n\t\u0005}\u0013\u0011\r\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005mS\tK\u0004\u0013\u0003+\t)'a\u001a\u0002\u000fM$(/\u001b8hg2R\u0011\u0011NA7\u0003c\n)(!\u001f\"\u0005\u0005-\u0014AE0`G>t7/^7fe~{gMZ:fiN\f#!a\u001c\u0002'}{FO]1og\u0006\u001cG/[8o?N$\u0018\r^3\"\u0005\u0005M\u0014!F0d_:4G.^3oi6\"\u0018.\u001a:.gR\fG/Z\u0011\u0003\u0003o\n\u0001dX2p]\u001adW/\u001a8u[1Lgn[\u0017nKR\fG-\u0019;bC\t\tY(A\t`G>tg\r\\;f]Rl\u0013/^8uCND3AEA\u0018\u00031\"Xm\u001d;GKR\u001c\u0007\u000e\u0016:b]NLG/[8ogR{\u0007+^:i/\",g.\u00138D_6l\u0017\u000e\u001e;fI&\u001b(\u000f\u000b\u0002\u0014U\u0006!D/Z:u\r\u0016$8\r[,ji\"LeN^1mS\u0012\u001cVm]:j_:LE\rR8fg:{G\u000f\u0016:b]NLG/[8o)>\u0004Vo\u001d5)\u0005QQ\u0017!\u0010;fgR4U\r^2i/&$\bNR5oC2\u0014V\r\u001d7jG\u0006$\u0018n\u001c8TKN\u001c\u0018n\u001c8JI\u0012{Wm\u001d(piR\u0013\u0018M\\:ji&|g\u000eV8QkND\u0007FA\u000bk\u0003Y#Xm\u001d;GKR\u001c\u0007nV5uQ\u001aKg.\u00197SKBd\u0017nY1uS>t7+Z:tS>t\u0017\n\u001a+sC:\u001c\u0018\u000e^5p]N$v\u000eU;mY\u0006sGmQ1o\u001d\u00164XM\u001d+sC:\u001c\u0018\u000e^5p]\n\u000b7m\u001b+p!V\u001c\b\u000e\u000b\u0002\u0017U\u0006\u0011D/Z:u\r\u0016$8\r[,ji\"$\u0015N^3sO&tw-\u00129pG\"$u.Z:O_R$&/\u00198tSRLwN\u001c+p!VdG\u000e\u000b\u0002\u0018U\u0006iC/Z:u\r\u0016$8\r[,ji\"tUm\u001e*fa2L7-Y#q_\u000eDGK]1og&$\u0018n\u001c8t)>\u0004V\u000f\u001c7)\u0005aQ\u0017a\u000e;fgR4U\r^2i/&$\bNT3x%\u0016\u0004H.[2b\u000bB|7\r[+qI\u0006$Xm\u001d*fa2L7-\u0019;j_:\u001cVm]:j_:LE\r\u000b\u0002\u001aU\u0006\u0019D/Z:u\r\u0016$8\r[,ji\"tUm\u001e*fa2L7-\u0019;j_:\u001cVm]:j_:$&/\u00198tSRLwN\\:U_B+H\u000e\u001c\u0015\u00035)\fQ\t^3ti\u001a{G\u000e\\8xKJ4U\r^2i\u0003\u001a$XM\u001d+sC:\u001c\u0018\u000e^5p]RC'o\\<t!V\u001c\bNU3qY&\u001c\u0017\r^5p]N#\u0018M\u001d;fI\u0016C8-\u001a9uS>t\u0007FA\u000ek\u0003)#Xm\u001d;G_2dwn^3s\r\u0016$8\r[,ji\"$&/\u00198tSRLwN\\+qI\u0006$Xm\u001d$fi\u000eD7\u000b^1uK\u0006sG-\u00138de\u0016lWM\u001c;t\u0011&<\u0007nV1uKJl\u0017M]6)\u0005qQ\u0017A\f;fgR,\u0006\u000fZ1uK\u001a{G\u000e\\8xKJ4U\r^2i'R\fG/\u001a$f]\u000e,7o\u0015;bY\u00164U\r^2iKND#!\b6\u0002KQ,7\u000f^(o!V\u001c\bnU3tg&|g.\u00128eK\u00124uN]*uC2,7+Z:tS>t\u0007F\u0001\u0010k\u0003=\"Xm\u001d;P]B+8\u000f[*fgNLwN\\#oI\u0016$W\u000b\u001d3bi\u0016\u001c(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8oQ\ty\".\u0001\u0016uKN$xJ\u001c)vg\"\u001cVm]:j_:,e\u000eZ3e/\",g\u000eU1si&$\u0018n\u001c8EK2,G/\u001a3)\u0005\u0001R\u0017a\f;fgR|e.\u00119qK:$'+Z2pe\u0012\u001c(+Z:q_:\u001cX-\u00169eCR,7\u000fS5hQ^\u000bG/\u001a:nCJ\\\u0007FA\u0011k\u00039\"Xm\u001d;P]\u0006\u0003\b/\u001a8e%\u0016\u001cwN\u001d3t%\u0016\u001c\bo\u001c8tKV\u0003H-\u0019;fg2{woV1uKJl\u0017M]6)\u0005\tR\u0017!\f;fgR|e.\u00119qK:$'+Z2pe\u0012\u001c(+Z:q_:\u001cXmQ8na2,G/Z:QkJ<\u0017\r^8ss\"\u00121E[\u00010i\u0016\u001cHo\u00148BaB,g\u000e\u001a*fG>\u0014Hm\u001d*fgB|gn]3XSRD7\u000b^1mKB+8\u000f[*fgNLwN\u001c\u0015\u0003I)\fq\u0006^3ti>s\u0017\t\u001d9f]\u0012\u0014VmY8sIN\u0014Vm\u001d9p]N,w+\u001b;i\t\u0016dW\r^3e!\u0006\u0014H/\u001b;j_:D#!\n6\u0002\u001fM,W\r\u001a'pO^KG\u000f\u001b#bi\u0006$2AZAh\u0011\u001d\t\tN\na\u0001\u0003'\f1\u0001\\8h!\u0011\t).!7\u000e\u0005\u0005]'bAAim%!\u00111\\Al\u0005-\t%m\u001d;sC\u000e$Hj\\4\u0002\u007fQ,7\u000f\u001e'fC\u0012,'\u000fT8h\u0003B\u0004XM\u001c3MSN$XM\\3s\u001d>$\u0018NZ5fgB+8\u000f\u001b*fa2L7-Y:G_JdU-\u00193fe\u0006\u0003\b/\u001a8eg\"\u0012qE[\u0001Hi\u0016\u001cH\u000fT3bI\u0016\u0014xJ\u001a4tKR\u001cH*[:uK:,'OT8uS\u001aLWm\u001d)vg\"\u0014V\r\u001d7jG\u0006\u001chi\u001c:M_\u001e\u001cF/\u0019:u\u001f\u001a47/\u001a;J]\u000e\u0014X-\\3oi\"\u0012\u0001F[\u0001Gi\u0016\u001cH\u000fT3bI\u0016\u0014xJ\u001a4tKR\u001cH*[:uK:,'OT8uS\u001aLWm\u001d)vg\"\u0014V\r\u001d7jG\u0006\u001chi\u001c:IS\u001eDw+\u0019;fe6\f'o[%oGJ,W.\u001a8uQ\tI#.\u0001\u0019uKN$H*Z1eKJdunZ!qa\u0016tG\rT5ti\u0016tWM]%h]>\u0014Xm\u001d$pY2|w/\u001a:Fm\u0016tGo\u001d\u0015\u0003U)\f1\b^3ti2+\u0017\rZ3s\u0019><\u0017\t\u001d9f]\u0012d\u0015n\u001d;f]\u0016\u0014\u0018J\\2mk\u0012,7\u000f\u0016:b]N\f7\r^5p]6\u000b'o[3s\u0003B\u0004XM\u001c3tQ\tY#.A\u0006baB,g\u000e\u001a+p\u0019><Gc\u00024\u0002v\u0006](\u0011\u0001\u0005\b\u0003#d\u0003\u0019AAj\u0011\u001d\tI\u0010\fa\u0001\u0003w\f1\u0002\\3bI\u0016\u0014X\t]8dQB\u0019A)!@\n\u0007\u0005}XIA\u0002J]RDqAa\u0001-\u0001\u0004\tY0A\u0003d_VtG/A\tbaB,g\u000e\u001a+p!\u0006\u0014H/\u001b;j_:$bA!\u0003\u0003 \t%\u0002\u0003\u0002B\u0006\u00057i!A!\u0004\u000b\t\t=!\u0011C\u0001\u0007e\u0016\u001cwN\u001d3\u000b\t\tM!QC\u0001\u0007G>lWn\u001c8\u000b\u0007]\u00129BC\u0002\u0003\u001aI\fa!\u00199bG\",\u0017\u0002\u0002B\u000f\u0005\u001b\u0011Q\"T3n_JL(+Z2pe\u0012\u001c\bb\u0002B\u0011[\u0001\u0007!1E\u0001\na\u0006\u0014H/\u001b;j_:\u00042a\u000fB\u0013\u0013\r\u00119\u0003\u000e\u0002\n!\u0006\u0014H/\u001b;j_:DqAa\u0001.\u0001\u0004\tY0\u0001\fgKR\u001c\u0007.Q:DCV<\u0007\u000e^+q%\u0016\u0004H.[2b)!\u0011yC!\u0011\u0003D\t5\u0003\u0003\u0002B\u0019\u0005{i!Aa\r\u000b\t\u0005E'Q\u0007\u0006\u0005\u0005o\u0011I$A\u0005j]R,'O\\1mg*!!1\bB\u000b\u0003\u001d\u0019Ho\u001c:bO\u0016LAAa\u0010\u00034\tYAj\\4SK\u0006$\u0017J\u001c4p\u0011\u001d\u0011\tC\fa\u0001\u0005GAqA!\u0012/\u0001\u0004\u00119%\u0001\u0007sKBd\u0017nY1Fa>\u001c\u0007\u000eE\u0002E\u0005\u0013J1Aa\u0013F\u0005\u0011auN\\4\t\u000f\t=c\u00061\u0001\u0003H\u0005!\"/\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012\f!$Y:tKJ$(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012$RA\u001aB+\u00053BqAa\u00160\u0001\u0004\tY0A\u0005sKBd\u0017nY1JI\"9!qJ\u0018A\u0002\tm\u0003\u0003\u0002#H\u0005\u000f\nA$Y:tKJ$(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u001b>$W\rF\u0003g\u0005C\u0012\u0019\u0007C\u0004\u0003XA\u0002\r!a?\t\u000f\t\u0015\u0004\u00071\u0001\u0003h\u0005y!/\u001a9mS\u000e\fG/[8o\u001b>$W\r\u0005\u0003\u0003j\t\re\u0002\u0002B6\u0005\u007fj!A!\u001c\u000b\t\t=$\u0011O\u0001\u0005aV\u001c\bN\u0003\u0003\u0003t\tU\u0014a\u0003:fa2L7-\u0019;j_:T1a\u000eB<\u0015\u0011\u0011IHa\u001f\u0002\u0013\r|gN\u001a7vK:$(B\u0001B?\u0003\tIw.\u0003\u0003\u0003\u0002\n5\u0014\u0001\u0005*fa2L7-\u0019;j_:\u001cF/\u0019;f\u0013\u0011\u0011)Ia\"\u0003\t5{G-\u001a\u0006\u0005\u0005\u0003\u0013i'A\fbGRLg/\u001a)vg\"\u001cVm]:j_:|%OR1jYR1!Q\u0012BJ\u0005+\u0003BAa\u001b\u0003\u0010&!!\u0011\u0013B7\u0005-\u0001Vo\u001d5TKN\u001c\u0018n\u001c8\t\u000f\t]\u0013\u00071\u0001\u0002|\"9!qJ\u0019A\u0002\t\u001d\u0013A\u0006<fe&4\u0017\u0010U;tQN+7o]5p]\u0016sG-\u001a3\u0015\u000f\u0019\u0014YJ!(\u0003 \"9!q\u000b\u001aA\u0002\u0005m\bb\u0002B(e\u0001\u0007!q\t\u0005\b\u0005C\u0013\u0004\u0019AA\u0007\u0003Q\u0019\bn\\;mIN+g\u000eZ#oIN+7o]5p]\u0002")
/* loaded from: input_file:kafka/cluster/PushReplicationPartitionTest.class */
public class PushReplicationPartitionTest extends AbstractPartitionTest {
    private final KRaftMetadataCache metadataCache = (KRaftMetadataCache) Mockito.mock(KRaftMetadataCache.class);

    @Override // kafka.cluster.AbstractPartitionTest
    public Option<TestUtils.MockPushManager> createPushReplicationManager() {
        return new Some(new TestUtils.MockPushManager());
    }

    @Override // kafka.cluster.AbstractPartitionTest
    /* renamed from: metadataCache, reason: merged with bridge method [inline-methods] */
    public KRaftMetadataCache mo61metadataCache() {
        return this.metadataCache;
    }

    @Test
    public void testMakeLeaderResetsReplicationSessionOnNewLeaderEpoch() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition not to change leader");
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Assertions.assertTrue(partition().getReplica(remoteReplicaId()).isDefined());
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(3).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition not to change leader");
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testMakeLeaderEndsPushSessionsWhenFencingReplicas() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenReplicasShutDown() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().isBrokerShuttingDown(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenPromotingLinkLeader() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setClusterLinkId(clusterLinkId().toString()).setClusterLinkTopicState(TopicLinkMirror$.MODULE$.name()).setLinkedLeaderEpoch(-1).setIsNew(false), offsetCheckpoints(), None$.MODULE$), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        fetchAsCaughtUpReplica(partition(), 1L, 3L);
    }

    @Test
    public void testAlterPartitionIsrShrinkEndsPushSession() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        partition.appendRecordsToLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, Predef$.MODULE$.int2Integer(5), new SimpleRecord[]{new SimpleRecord(Long.toString(time().milliseconds()).getBytes())}), AppendOrigin.CLIENT, 0, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testDeletePartitionEndsPushSessions() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().delete();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMarkPartitionOfflineEndsPushSessions() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().markOffline();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testConcurrentMakeLeaderWithCaughtUpFetch(boolean z) {
        int i = 5;
        setupPartitionWithMocks(5, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 0L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Future submit2 = executor().submit(() -> {
            return BoxesRunTime.boxToBoolean($anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(this, semaphore, z, i));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        semaphore.release(2);
        Assertions.assertNotEquals(BoxesRunTime.boxToBoolean(z), submit2.get(15L, TimeUnit.SECONDS), "Unexpected leadership change");
        Throwable cause = Assertions.assertThrows(Exception.class, () -> {
            submit.get(15L, TimeUnit.SECONDS);
        }).getCause();
        Assertions.assertTrue((cause instanceof FencedLeaderEpochException) || (cause instanceof PushReplicationStartedException));
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        if (!(cause instanceof FencedLeaderEpochException)) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        }
        assertReplicationSessionId(remoteReplicaId(), z ? new Some(BoxesRunTime.boxToLong(-1L)) : None$.MODULE$);
    }

    @Test
    public void testMakeFollowerEndsReplicationSession() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertTrue(partition().makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())));
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v34 */
    /* JADX WARN: Type inference failed for: r0v51, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaEpochFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(2L)));
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v33 */
    /* JADX WARN: Type inference failed for: r0v50, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaSessionFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 3L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(3L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    @Test
    public void testFetchTransitionsToPushWhenFullyCaughtUp() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        int i = 10;
        long j = 0;
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.highWatermark());
        Assertions.assertEquals(23L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 20L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(26L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 23L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager3 -> {
            return mockPushManager3.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 26L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Assertions.assertFalse(((MapLike) pushReplicationManager().map(mockPushManager4 -> {
            return mockPushManager4.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchTransitionsToPushOnlyWithDefinedTopicId() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(brokerId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), None$.MODULE$), "Expected become leader transition to succeed");
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Partition leader ID should not change");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @ValueSource(strings = {"__consumer_offsets", "__transaction_state", "_confluent-tier-state", "_confluent-link-metadata", "_confluent-quotas"})
    @ParameterizedTest
    public void testFetchDoesNotTransitionInternalTopicsToPush(String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        configRepository().setTopicConfig("_confluent-tier-state", "retention.ms", Integer.toString(-1));
        partition_$eq(new Partition(topicPartition, Defaults$.MODULE$.ReplicaLagTimeMaxMs(), interBrokerProtocolVersion(), brokerId(), () -> {
            return this.defaultBrokerEpoch(this.brokerId());
        }, time(), alterPartitionListener(), delayedOperations(), mo61metadataCache(), logManager(), new Some(tierReplicaManager()), None$.MODULE$, Partition$.MODULE$.$lessinit$greater$default$13(), alterPartitionManager(), None$.MODULE$, Partition$.MODULE$.$lessinit$greater$default$16(), Partition$.MODULE$.$lessinit$greater$default$17(), pushReplicationManager()));
        Mockito.when(offsetCheckpoints().fetch(ArgumentMatchers.anyString(), (TopicPartition) ArgumentMatchers.eq(topicPartition))).thenReturn(None$.MODULE$);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition to succeed");
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
    }

    @Test
    public void testFetchTransitionsToPushWhenInCommittedIsr() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$));
        int i = 10;
        long j = 0;
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().assertInFlightLeaderAndIsr((Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), 10, 1);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
    }

    @Test
    public void testFetchWithInvalidSessionIdDoesNotTransitionToPush() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 1L, -1L);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertFalse(((MapLike) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdDoesNotTransitionToPush() {
        long j = 1;
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdTransitionsToPullAndCanNeverTransitionBackToPush() {
        long j = 1;
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicPartitionReplica(topicPartition(), remoteReplicaId()), 0L, false);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithDivergingEpochDoesNotTransitionToPull() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$));
        int i = 10;
        long j = 0;
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Partition partition = setupPartitionWithMocks(10, true);
        read$1(2, 5L, true, partition, 0L, 10);
        read$1(0, 4L, true, partition, 0L, 10);
        read$1(6, 6L, true, partition, 0L, 10);
        read$1(8, 17L, true, partition, 0L, 10);
        read$1(10, 18L, true, partition, 0L, 10);
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.read$1(9, 17L, false, partition, j, i);
        });
        Assertions.assertFalse(((MapLike) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithNewReplicaEpochTransitionsToPull() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(5L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        fetchAsCaughtUpReplica(partition(), 5L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        Assertions.assertThrows(NotLeaderOrFollowerException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @Test
    public void testFetchWithNewReplicaEpochUpdatesReplicationSessionId() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 2L, 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 1L);
        });
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 3L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
    }

    @Test
    public void testFetchWithNewReplicationSessionTransitionsToPull() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertTrue(((MapLike) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        activePushSessionOrFail(remoteReplicaId(), 1L);
    }

    @Test
    public void testFollowerFetchAfterTransitionThrowsPushReplicationStartedException() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
    }

    @Test
    public void testFollowerFetchWithTransitionUpdatesFetchStateAndIncrementsHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        Assertions.assertEquals(0L, orCreateLog.highWatermark());
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(5L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(partition, 1L, 0L);
        });
        Assertions.assertEquals(orCreateLog.logEndOffset(), orCreateLog.highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(partition.topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), orCreateLog.highWatermark());
    }

    @Test
    public void testUpdateFollowerFetchStateFencesStaleFetches() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        long j = 3;
        long j2 = 4;
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(3L)), 4L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(4L)));
        Assertions.assertThrows(FencedLeaderEpochException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(9)), j, j2, partition, orCreateLog);
        });
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(10)), j, j2 - 1, partition, orCreateLog);
        });
    }

    @Test
    public void testOnPushSessionEndedForStaleSession() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 1L);
        fetchAsCaughtUpReplica(partition(), 1L, 2L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        activePushSessionOrFail2.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        PushSession activePushSessionOrFail3 = activePushSessionOrFail(remoteReplicaId(), 2L);
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader not to change leadership");
        activePushSessionOrFail3.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testOnPushSessionEndedUpdatesReplicationSession() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        assertReplicationSessionMode(remoteReplicaId(), ReplicationState.Mode.PULL);
    }

    @Test
    public void testOnPushSessionEndedWhenPartitionDeleted() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        partition().delete();
        activePushSessionOrFail.onPushSessionEnded();
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertEquals(0L, partition().lowWatermarkIfLeader());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        activePushSessionOrFail.onAppendRecordsResponse(orCreateLog.logEndOffset(), 3L);
        Assertions.assertTrue(alterPartitionManager().isrUpdates().isEmpty());
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesLowWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        LogDeleteRecordsResult deleteRecordsOnLeader = partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(5L, deleteRecordsOnLeader.requestedOffset());
        Assertions.assertEquals(0L, deleteRecordsOnLeader.lowWatermark());
        activePushSessionOrFail.onAppendRecordsResponse(17L, 5L);
        Assertions.assertEquals(5L, partition().lowWatermarkIfLeader());
    }

    @Test
    public void testOnAppendRecordsResponseCompletesPurgatory() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(17L, 0L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.never())).checkAndCompleteAll();
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(1))).checkAndCompleteAll();
        Mockito.reset(new DelayedOperations[]{delayedOperations()});
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(0))).checkAndCompleteAll();
        Mockito.when(BoxesRunTime.boxToInteger(delayedOperations().numDelayedDelete())).thenReturn(BoxesRunTime.boxToInteger(1));
        partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(3L, partition().lowWatermarkIfLeader());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 5L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(1))).checkAndCompleteAll();
    }

    @Test
    public void testOnAppendRecordsResponseWithStalePushSession() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 0L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(27L, orCreateLog.logEndOffset());
        activePushSessionOrFail2.onAppendRecordsResponse(23L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader not to change leadership");
        activePushSessionOrFail2.onAppendRecordsResponse(25L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
    }

    @Test
    public void testOnAppendRecordsResponseWithDeletedPartition() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$);
        seedLogWithData(orCreateLog);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo61metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        partition().delete();
        activePushSessionOrFail.onAppendRecordsResponse(22L, 0L);
    }

    private void seedLogWithData(AbstractLog abstractLog) {
        appendToLog(abstractLog, 0, 2);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 4, 5);
        appendToLog(abstractLog, 7, 1);
        appendToLog(abstractLog, 9, 3);
        Assertions.assertEquals(17L, abstractLog.logEndOffset());
    }

    @Test
    public void testLeaderLogAppendListenerNotifiesPushReplicasForLeaderAppends() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition not to change leader");
        AbstractLog localLogOrException = partition().localLogOrException();
        appendToPartition(partition(), 5);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(5, localLogOrException.logEndOffset());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L, 5, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        int i = 5 + 5;
        Assertions.assertEquals(i, localLogOrException.logEndOffset());
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId)).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        fetchFollower(partition(), remoteReplicaId, 3L, fetchFollower$default$4(), fetchFollower$default$5(), fetchFollower$default$6(), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), fetchFollower$default$10(), fetchFollower$default$11());
        Assertions.assertEquals(3L, partition().localLogOrException().highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 3L, i, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(i + 5, localLogOrException.logEndOffset());
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition to succeed");
        appendToPartition(partition(), 5);
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderLsoEvent(this.topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderLsoEvents(this.topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForHighWatermarkIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertTrue(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition to succeed");
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L);
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(partition().localLogOrException().highWatermark(), 0L);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            activePushSessionOrFail.onAppendRecordsResponse(i, this.partition().logStartOffset());
            Assertions.assertEquals(i, this.partition().localLogOrException().highWatermark());
        });
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i2 -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderHwmEvent(this.topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i2);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderHwmEvents(this.topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderLogAppendListenerIgnoresFollowerEvents() {
        setupPartitionWithMocks(5, false);
        partition().appendRecordsToFollower(0L, AppendOrigin.REPLICATION, -1L, Optional.of(Predef$.MODULE$.int2Integer(5)), MemoryRecords.withRecords(0L, CompressionType.NONE, Predef$.MODULE$.int2Integer(5), (SimpleRecord[]) ((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), 3L, Optional.empty());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderHwmEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        partition().localLogOrException().maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.LeaderOffsetIncremented);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderLsoEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
    }

    @Test
    public void testLeaderLogAppendListenerIncludesTransactionMarkerAppends() {
        Mockito.when(mo61metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertFalse(partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId())), "Expected become leader transition not to change leader");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        AbstractLog localLogOrException = partition().localLogOrException();
        Object maybeStartTransactionVerification = partition().maybeStartTransactionVerification(1L, 0, (short) 0);
        AbstractRecords createTransactionalRecords = createTransactionalRecords(new $colon.colon(new SimpleRecord("k1".getBytes(), "v1".getBytes()), Nil$.MODULE$), 0L, createTransactionalRecords$default$3(), 1L);
        partition().appendRecordsToLeader(createTransactionalRecords, AppendOrigin.CLIENT, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), time().milliseconds(), maybeStartTransactionVerification);
        int size = ((TraversableOnce) CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(createTransactionalRecords.records()).asScala()).size();
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L, 0L, createTransactionalRecords);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size, localLogOrException.logEndOffset());
        AbstractRecords withEndTransactionMarker = MemoryRecords.withEndTransactionMarker(1L, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        Partition partition = partition();
        partition.appendRecordsToLeader(withEndTransactionMarker, AppendOrigin.COORDINATOR, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L, size, withEndTransactionMarker);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicPartition(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size + ((TraversableOnce) CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(withEndTransactionMarker.records()).asScala()).size(), localLogOrException.logEndOffset());
    }

    private void appendToLog(AbstractLog abstractLog, int i, int i2) {
        abstractLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, Predef$.MODULE$.int2Integer(i), (SimpleRecord[]) ((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i2).map(obj -> {
            return $anonfun$appendToLog$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), i, abstractLog.appendAsLeader$default$3(), abstractLog.appendAsLeader$default$4(), abstractLog.appendAsLeader$default$5(), abstractLog.appendAsLeader$default$6());
    }

    private MemoryRecords appendToPartition(Partition partition, int i) {
        MemoryRecords withRecords = MemoryRecords.withRecords(0L, CompressionType.NONE, (SimpleRecord[]) ((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return $anonfun$appendToPartition$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class)));
        partition.appendRecordsToLeader(withRecords, AppendOrigin.CLIENT, 1, RequestLocal$.MODULE$.NoCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        return withRecords;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogReadInfo fetchAsCaughtUpReplica(Partition partition, long j, long j2) {
        AbstractLog localLogOrException = partition.localLogOrException();
        return fetchFollower(partition, remoteReplicaId(), localLogOrException.logEndOffset(), fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(partition.getLeaderEpoch())), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j)), j2);
    }

    private void assertReplicationSessionId(int i, Option<Object> option) {
        Assertions.assertEquals(option, partition().getReplica(i).map(replica -> {
            return BoxesRunTime.boxToLong($anonfun$assertReplicationSessionId$1(replica));
        }));
    }

    private void assertReplicationSessionMode(int i, ReplicationState.Mode mode) {
        Assertions.assertEquals(mode, ((Replica) partition().getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to exist");
        })).stateSnapshot().replicationSessionState().mode());
    }

    private PushSession activePushSessionOrFail(int i, long j) {
        Assertions.assertTrue(partition().remotePushReplicas().contains(BoxesRunTime.boxToInteger(i)));
        return ((TestUtils.MockPushManager) pushReplicationManager().get()).activePushSessionOrFail(new TestUtils.TopicPartitionReplica(topicPartition(), i), j);
    }

    private void verifyPushSessionEnded(int i, long j, boolean z) {
        Assertions.assertFalse(partition().remotePushReplicas().contains(BoxesRunTime.boxToInteger(i)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicPartitionReplica(topicPartition(), remoteReplicaId()), j, z);
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(PushReplicationPartitionTest pushReplicationPartitionTest, Semaphore semaphore, boolean z, int i) {
        semaphore.acquire();
        return z ? pushReplicationPartitionTest.partition().makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.brokerId()).setLeaderEpoch(i + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId())) : pushReplicationPartitionTest.partition().makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.remoteReplicaId()).setLeaderEpoch(i + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId()));
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final LogReadInfo read$1(int i, long j, boolean z, Partition partition, long j2, int i2) {
        int remoteReplicaId = remoteReplicaId();
        Some some = new Some(BoxesRunTime.boxToInteger(i2));
        Some some2 = new Some(BoxesRunTime.boxToLong(1L));
        LogReadInfo fetchFollower = fetchFollower(partition, remoteReplicaId, j, j2, fetchFollower$default$5(), some, new Some(BoxesRunTime.boxToInteger(i)), fetchFollower$default$8(), fetchFollower$default$9(), some2, 0L);
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(fetchFollower.divergingEpoch.isPresent()));
        return fetchFollower;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void updateFetchState$1(int i, Optional optional, long j, long j2, Partition partition, AbstractLog abstractLog) {
        partition.updateFollowerFetchState((Replica) partition.getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to be defined");
        }), new LogOffsetMetadata(7L), 0L, time().milliseconds(), abstractLog.logEndOffset(), j, optional, j2);
    }

    public static final /* synthetic */ LogDeleteRecordsResult $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(PushReplicationPartitionTest pushReplicationPartitionTest, int i) {
        return pushReplicationPartitionTest.partition().deleteRecordsOnLeader(i);
    }

    public static final /* synthetic */ SimpleRecord $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToLog$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToPartition$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ long $anonfun$assertReplicationSessionId$1(Replica replica) {
        return replica.stateSnapshot().replicationSessionState().replicationSessionId();
    }
}
