package kafka.cluster;

import io.confluent.kafka.replication.push.PushManager;
import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.ReplicationState;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import kafka.cluster.AbstractPartitionTest;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.LogDeleteRecordsResult;
import kafka.server.RequestLocal$;
import kafka.server.link.TopicLinkMirror$;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedReplicationSessionIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.PushReplicationStartedException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadInfo;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

/* compiled from: PushReplicationPartitionTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u001dh\u0001B\u001d;\u0001}BQ\u0001\u0012\u0001\u0005\u0002\u0015CQa\u0012\u0001\u0005B!Cqa\u0018\u0001C\u0002\u0013\u0005\u0003\r\u0003\u0004j\u0001\u0001\u0006I!\u0019\u0005\bU\u0002\u0011\r\u0011\"\u0001l\u0011\u0019A\b\u0001)A\u0005Y\")\u0011\u0010\u0001C\u0001u\"1\u00111\u0003\u0001\u0005\u0002iDa!a\u0006\u0001\t\u0003Q\bBBA\u000e\u0001\u0011\u0005!\u0010\u0003\u0004\u0002 \u0001!\tA\u001f\u0005\u0007\u0003G\u0001A\u0011\u0001>\t\r\u0005\u001d\u0002\u0001\"\u0001{\u0011\u0019\tY\u0003\u0001C\u0001u\"9\u0011q\u0006\u0001\u0005\u0002\u0005E\u0002BBA1\u0001\u0011\u0005!\u0010\u0003\u0004\u0002f\u0001!\tA\u001f\u0005\u0007\u0003S\u0002A\u0011\u0001>\t\r\u00055\u0004\u0001\"\u0001{\u0011\u0019\t\t\b\u0001C\u0001u\"9\u0011Q\u000f\u0001\u0005\u0002\u0005]\u0004bBAU\u0001\u0011\u0005\u00111\u0016\u0005\b\u0003k\u0003A\u0011BA\\\u0011\u0019\tY\u000e\u0001C\u0001u\"1\u0011q\u001c\u0001\u0005\u0002iDa!a9\u0001\t\u0003Q\bBBAt\u0001\u0011\u0005!\u0010\u0003\u0004\u0002l\u0002!\tA\u001f\u0005\u0007\u0003_\u0004A\u0011\u0001>\t\r\u0005M\b\u0001\"\u0001{\u0011\u0019\t9\u0010\u0001C\u0001u\"1\u00111 \u0001\u0005\u0002iDa!a@\u0001\t\u0003Q\bB\u0002B\u0002\u0001\u0011\u0005!\u0010\u0003\u0004\u0003\b\u0001!\tA\u001f\u0005\u0007\u0005\u0017\u0001A\u0011\u0001>\t\r\t=\u0001\u0001\"\u0001{\u0011\u0019\u0011\u0019\u0002\u0001C\u0001u\"1!q\u0003\u0001\u0005\u0002iDaAa\u0007\u0001\t\u0003Q\bB\u0002B\u0010\u0001\u0011\u0005!\u0010\u0003\u0004\u0003$\u0001!\tA\u001f\u0005\u0007\u0005O\u0001A\u0011\u0001>\t\u000f\t-\u0002\u0001\"\u0003\u0003.!1!Q\b\u0001\u0005\u0002iDaA!\u0011\u0001\t\u0003Q\bB\u0002B#\u0001\u0011\u0005!\u0010\u0003\u0004\u0003J\u0001!\tA\u001f\u0005\u0007\u0005\u001b\u0002A\u0011\u0001>\t\u000f\tE\u0003\u0001\"\u0003\u0003T!9!Q\r\u0001\u0005\n\t\u001d\u0004b\u0002BB\u0001\u0011%!Q\u0011\u0005\b\u0005S\u0003A\u0011\u0002BV\u0011\u001d\u0011)\f\u0001C\u0005\u0005oCqA!4\u0001\t\u0013\u0011y\rC\u0004\u0003\\\u0002!IA!8\u00039A+8\u000f\u001b*fa2L7-\u0019;j_:\u0004\u0016M\u001d;ji&|g\u000eV3ti*\u00111\bP\u0001\bG2,8\u000f^3s\u0015\u0005i\u0014!B6bM.\f7\u0001A\n\u0003\u0001\u0001\u0003\"!\u0011\"\u000e\u0003iJ!a\u0011\u001e\u0003+\u0005\u00137\u000f\u001e:bGR\u0004\u0016M\u001d;ji&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012A\u0012\t\u0003\u0003\u0002\tAd\u0019:fCR,\u0007+^:i%\u0016\u0004H.[2bi&|g.T1oC\u001e,'/F\u0001J!\rQUjT\u0007\u0002\u0017*\tA*A\u0003tG\u0006d\u0017-\u0003\u0002O\u0017\n1q\n\u001d;j_:\u0004\"\u0001\u0015/\u000f\u0005EKfB\u0001*X\u001d\t\u0019f+D\u0001U\u0015\t)f(\u0001\u0004=e>|GOP\u0005\u0002{%\u0011\u0001\fP\u0001\u0006kRLGn]\u0005\u00035n\u000b\u0011\u0002V3tiV#\u0018\u000e\\:\u000b\u0005ac\u0014BA/_\u0005=iunY6QkNDW*\u00198bO\u0016\u0014(B\u0001.\\\u00035iW\r^1eCR\f7)Y2iKV\t\u0011\r\u0005\u0002cO6\t1M\u0003\u0002eK\u0006AQ.\u001a;bI\u0006$\u0018M\u0003\u0002gy\u000511/\u001a:wKJL!\u0001[2\u0003%-\u0013\u0016M\u001a;NKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u0001\u000f[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3!\u0003A!x\u000e]5d\u0013\u0012\u0004\u0016M\u001d;ji&|g.F\u0001m!\tig/D\u0001o\u0015\ty\u0007/\u0001\u0004d_6lwN\u001c\u0006\u0003MFT!!\u0010:\u000b\u0005M$\u0018AB1qC\u000eDWMC\u0001v\u0003\ry'oZ\u0005\u0003o:\u0014\u0001\u0003V8qS\u000eLE\rU1si&$\u0018n\u001c8\u0002#Q|\u0007/[2JIB\u000b'\u000f^5uS>t\u0007%\u0001\u001cuKN$X*Y6f\u0019\u0016\fG-\u001a:SKN,Go\u001d*fa2L7-\u0019;j_:\u001cVm]:j_:|eNT3x\u0019\u0016\fG-\u001a:Fa>\u001c\u0007\u000eF\u0001|!\tQE0\u0003\u0002~\u0017\n!QK\\5uQ\t9q\u0010\u0005\u0003\u0002\u0002\u0005=QBAA\u0002\u0015\u0011\t)!a\u0002\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0002\n\u0005-\u0011a\u00026va&$XM\u001d\u0006\u0004\u0003\u001b!\u0018!\u00026v]&$\u0018\u0002BA\t\u0003\u0007\u0011A\u0001V3ti\u0006\tD/Z:u\u001b\u0006\\W\rT3bI\u0016\u0014XI\u001c3t!V\u001c\bnU3tg&|gn],iK:4UM\\2j]\u001e\u0014V\r\u001d7jG\u0006\u001c\bF\u0001\u0005��\u0003E\"Xm\u001d;NC.,G*Z1eKJ,e\u000eZ:QkND7+Z:tS>tw\u000b[3o%\u0016\u0004H.[2bgNCW\u000f\u001e#po:D#!C@\u0002iQ,7\u000f^'bW\u0016dU-\u00193fe\u0016sGm\u001d)vg\"\u001cVm]:j_:<\u0006.\u001a8Qe>lw\u000e^5oO2Kgn\u001b'fC\u0012,'\u000f\u000b\u0002\u000b\u007f\u0006ID/Z:u\r\u0016$8\r[!gi\u0016\u0014\b+^:i%\u0016\u0004H.[2bi&|g\u000eR5tC\ndW\rZ!oI\n\u0013xn[3s\u000bB|7\r\u001b\"v[B,G\r\u000b\u0002\f\u007f\u0006QC/Z:u\u00032$XM\u001d)beRLG/[8o\u0013N\u00148\u000b\u001b:j].,e\u000eZ:QkND7+Z:tS>t\u0007F\u0001\u0007��\u0003\r\"Xm\u001d;EK2,G/\u001a)beRLG/[8o\u000b:$7\u000fU;tQN+7o]5p]ND#!D@\u0002QQ,7\u000f^'be.\u0004\u0016M\u001d;ji&|gn\u00144gY&tW-\u00128egB+8\u000f[*fgNLwN\\:)\u00059y\u0018!\u000b;fgR\u001cuN\\2veJ,g\u000e^'bW\u0016dU-\u00193fe^KG\u000f[\"bk\u001eDG/\u00169GKR\u001c\u0007\u000eF\u0002|\u0003gAq!!\u000e\u0010\u0001\u0004\t9$\u0001\u0006nC.,G*Z1eKJ\u00042ASA\u001d\u0013\r\tYd\u0013\u0002\b\u0005>|G.Z1oQ\ry\u0011q\b\t\u0005\u0003\u0003\n9%\u0004\u0002\u0002D)!\u0011QIA\u0004\u0003\u0019\u0001\u0018M]1ng&!\u0011\u0011JA\"\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f\u001e\u0015\b\u001f\u00055\u0013\u0011LA.!\u0011\ty%!\u0016\u000e\u0005\u0005E#\u0002BA*\u0003\u0007\n\u0001\u0002\u001d:pm&$WM]\u0005\u0005\u0003/\n\tFA\u0006WC2,XmU8ve\u000e,\u0017\u0001\u00032p_2,\u0017M\\:-\t\u0005u\u0013qL\r\u0002\u0001e\t\u0011!\u0001\u0014uKN$X*Y6f\r>dGn\\<fe\u0016sGm\u001d*fa2L7-\u0019;j_:\u001cVm]:j_:D#\u0001E@\u0002eQ,7\u000f^\"p]\u000e,(O]3oi\u000e\u000bWo\u001a5u+B4U\r^2i\u0003:$g*Z<SKBd\u0017nY1Fa>\u001c\u0007NR3uG\"D#!E@\u0002iQ,7\u000f^\"p]\u000e,(O]3oi\u000e\u000bWo\u001a5u+B4U\r^2i\u0003:$g*Z<SKBd\u0017nY1TKN\u001c\u0018n\u001c8GKR\u001c\u0007\u000e\u000b\u0002\u0013\u007f\u0006YC/Z:u\r\u0016$8\r\u001b+sC:\u001c\u0018\u000e^5p]N$v\u000eU;tQ^CWM\u001c$vY2L8)Y;hQR,\u0006\u000f\u000b\u0002\u0014\u007f\u0006\u0001D/Z:u\r\u0016$8\r\u001b+sC:\u001c\u0018\u000e^5p]N$v\u000eU;tQ>sG._,ji\"$UMZ5oK\u0012$v\u000e]5d\u0013\u0012D#\u0001F@\u0002oQ,7\u000f\u001e$fi\u000eDGi\\3t\u001d>$HK]1og&$\u0018n\u001c8J]R,'O\\1m)>\u0004\u0018nY:U_B+8\u000f\u001b\"z\t\u00164\u0017-\u001e7u)\rY\u0018\u0011\u0010\u0005\b\u0003w*\u0002\u0019AA?\u0003\u0015!x\u000e]5d!\u0011\ty(a\"\u000f\t\u0005\u0005\u00151\u0011\t\u0003'.K1!!\"L\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011RAF\u0005\u0019\u0019FO]5oO*\u0019\u0011QQ&)\u0007U\ty\u0004K\u0004\u0016\u0003\u001b\n\t*a%\u0002\u000fM$(/\u001b8hg2R\u0011QSAM\u0003;\u000b\t+!*\"\u0005\u0005]\u0015AE0`G>t7/^7fe~{gMZ:fiN\f#!a'\u0002'}{FO]1og\u0006\u001cG/[8o?N$\u0018\r^3\"\u0005\u0005}\u0015!F0d_:4G.^3oi6\"\u0018.\u001a:.gR\fG/Z\u0011\u0003\u0003G\u000b\u0001dX2p]\u001adW/\u001a8u[1Lgn[\u0017nKR\fG-\u0019;bC\t\t9+A\t`G>tg\r\\;f]Rl\u0013/^8uCN\f1\u0007^3ti\u001a+Go\u00195Ue\u0006t7/\u001b;j_:\u001c\u0018J\u001c;fe:\fG\u000eV8qS\u000e\u001cHk\u001c)vg\"<\u0006.\u001a8F]\u0006\u0014G.\u001a3\u0015\u0007m\fi\u000bC\u0004\u0002|Y\u0001\r!! )\u0007Y\ty\u0004K\u0004\u0017\u0003\u001b\n\t*a--\u0015\u0005U\u0015\u0011TAO\u0003C\u000b)+A\u000etKR,\b/\u00138uKJt\u0017\r\u001c+pa&\u001c\u0007+\u0019:uSRLwN\u001c\u000b\u0006w\u0006e\u00161\u0018\u0005\b\u0003w:\u0002\u0019AA?\u0011\u001d\til\u0006a\u0001\u0003\u007f\u000b1\u0002];tQ6\u000bg.Y4feB!!*TAa!\u0011\t\u0019-a6\u000e\u0005\u0005\u0015'\u0002BAd\u0003\u0013\fA\u0001];tQ*!\u00111ZAg\u0003-\u0011X\r\u001d7jG\u0006$\u0018n\u001c8\u000b\u0007u\nyM\u0003\u0003\u0002R\u0006M\u0017!C2p]\u001adW/\u001a8u\u0015\t\t).\u0001\u0002j_&!\u0011\u0011\\Ac\u0005-\u0001Vo\u001d5NC:\fw-\u001a:\u0002YQ,7\u000f\u001e$fi\u000eDGK]1og&$\u0018n\u001c8t)>\u0004Vo\u001d5XQ\u0016t\u0017J\\\"p[6LG\u000f^3e\u0013N\u0014\bF\u0001\r��\u0003Q\"Xm\u001d;GKR\u001c\u0007nV5uQ&sg/\u00197jIN+7o]5p]&#Gi\\3t\u001d>$HK]1og&$\u0018n\u001c8U_B+8\u000f\u001b\u0015\u00033}\fQ\b^3ti\u001a+Go\u00195XSRDg)\u001b8bYJ+\u0007\u000f\\5dCRLwN\\*fgNLwN\\%e\t>,7OT8u)J\fgn]5uS>tGk\u001c)vg\"D#AG@\u0002cQ,7\u000f\u001e$fi\u000eDw+\u001b;i_V$(I]8lKJtu\u000eZ3E_\u0016\u001chj\u001c;Ue\u0006t7/\u001b;j_:$v\u000eU;tQ\"\u00121d`\u0001Wi\u0016\u001cHOR3uG\"<\u0016\u000e\u001e5GS:\fGNU3qY&\u001c\u0017\r^5p]N+7o]5p]&#GK]1og&$\u0018n\u001c8t)>\u0004V\u000f\u001c7B]\u0012\u001c\u0015M\u001c(fm\u0016\u0014HK]1og&$\u0018n\u001c8CC\u000e\\Gk\u001c)vg\"D#\u0001H@\u0002eQ,7\u000f\u001e$fi\u000eDw+\u001b;i\t&4XM]4j]\u001e,\u0005o\\2i\t>,7OT8u)J\fgn]5uS>tGk\u001c)vY2D#!H@\u0002[Q,7\u000f\u001e$fi\u000eDw+\u001b;i\u001d\u0016<(+\u001a9mS\u000e\fW\t]8dQR\u0013\u0018M\\:ji&|gn\u001d+p!VdG\u000e\u000b\u0002\u001f\u007f\u00069D/Z:u\r\u0016$8\r[,ji\"tUm\u001e*fa2L7-Y#q_\u000eDW\u000b\u001d3bi\u0016\u001c(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012D#aH@\u0002gQ,7\u000f\u001e$fi\u000eDw+\u001b;i\u001d\u0016<(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o)J\fgn]5uS>t7\u000fV8Qk2d\u0007F\u0001\u0011��\u0003\u0015#Xm\u001d;G_2dwn^3s\r\u0016$8\r[!gi\u0016\u0014HK]1og&$\u0018n\u001c8UQJ|wo\u001d)vg\"\u0014V\r\u001d7jG\u0006$\u0018n\u001c8Ti\u0006\u0014H/\u001a3Fq\u000e,\u0007\u000f^5p]\"\u0012\u0011e`\u0001Ki\u0016\u001cHOR8mY><XM\u001d$fi\u000eDw+\u001b;i)J\fgn]5uS>tW\u000b\u001d3bi\u0016\u001ch)\u001a;dQN#\u0018\r^3B]\u0012Len\u0019:f[\u0016tGo\u001d%jO\"<\u0016\r^3s[\u0006\u00148\u000e\u000b\u0002#\u007f\u0006qC/Z:u+B$\u0017\r^3G_2dwn^3s\r\u0016$8\r[*uCR,g)\u001a8dKN\u001cF/\u00197f\r\u0016$8\r[3tQ\t\u0019s0A\u0013uKN$xJ\u001c)vg\"\u001cVm]:j_:,e\u000eZ3e\r>\u00148\u000b^1mKN+7o]5p]\"\u0012Ae`\u00010i\u0016\u001cHo\u00148QkND7+Z:tS>tWI\u001c3fIV\u0003H-\u0019;fgJ+\u0007\u000f\\5dCRLwN\\*fgNLwN\u001c\u0015\u0003K}\f!\u0006^3ti>s\u0007+^:i'\u0016\u001c8/[8o\u000b:$W\rZ,iK:\u0004\u0016M\u001d;ji&|g\u000eR3mKR,G\r\u000b\u0002'\u007f\u0006yC/Z:u\u001f:\f\u0005\u000f]3oIJ+7m\u001c:egJ+7\u000f]8og\u0016,\u0006\u000fZ1uKND\u0015n\u001a5XCR,'/\\1sW\"\u0012qe`\u0001/i\u0016\u001cHo\u00148BaB,g\u000e\u001a*fG>\u0014Hm\u001d*fgB|gn]3Va\u0012\fG/Z:M_^<\u0016\r^3s[\u0006\u00148\u000e\u000b\u0002)\u007f\u0006\u0001D/Z:u)JL8i\\7qY\u0016$X\rR3mCf,GMU3rk\u0016\u001cHo]\"p[BdW\r^3t!V\u0014x-\u0019;pefD#!K@\u0002_Q,7\u000f^(o\u0003B\u0004XM\u001c3SK\u000e|'\u000fZ:SKN\u0004xN\\:f/&$\bn\u0015;bY\u0016\u0004Vo\u001d5TKN\u001c\u0018n\u001c8)\u0005)z\u0018a\f;fgR|e.\u00119qK:$'+Z2pe\u0012\u001c(+Z:q_:\u001cXmV5uQ\u0012+G.\u001a;fIB\u000b'\u000f^5uS>t\u0007FA\u0016��\u0003=\u0019X-\u001a3M_\u001e<\u0016\u000e\u001e5ECR\fGcA>\u00030!9!\u0011\u0007\u0017A\u0002\tM\u0012a\u00017pOB!!Q\u0007B\u001d\u001b\t\u00119DC\u0002\u00032qJAAa\u000f\u00038\tY\u0011IY:ue\u0006\u001cG\u000fT8h\u0003}\"Xm\u001d;MK\u0006$WM\u001d'pO\u0006\u0003\b/\u001a8e\u0019&\u001cH/\u001a8fe:{G/\u001b4jKN\u0004Vo\u001d5SKBd\u0017nY1t\r>\u0014H*Z1eKJ\f\u0005\u000f]3oIND#!L@\u0002\u000fR,7\u000f\u001e'fC\u0012,'o\u00144gg\u0016$8\u000fT5ti\u0016tWM\u001d(pi&4\u0017.Z:QkND'+\u001a9mS\u000e\f7OR8s\u0019><7\u000b^1si>3gm]3u\u0013:\u001c'/Z7f]RD#AL@\u0002\rR,7\u000f\u001e'fC\u0012,'o\u00144gg\u0016$8\u000fT5ti\u0016tWM\u001d(pi&4\u0017.Z:QkND'+\u001a9mS\u000e\f7OR8s\u0011&<\u0007nV1uKJl\u0017M]6J]\u000e\u0014X-\\3oi\"\u0012qf`\u00011i\u0016\u001cH\u000fT3bI\u0016\u0014Hj\\4BaB,g\u000e\u001a'jgR,g.\u001a:JO:|'/Z:G_2dwn^3s\u000bZ,g\u000e^:)\u0005Az\u0018a\u000f;fgRdU-\u00193fe2{w-\u00119qK:$G*[:uK:,'/\u00138dYV$Wm\u001d+sC:\u001c\u0018m\u0019;j_:l\u0015M]6fe\u0006\u0003\b/\u001a8eg\"\u0012\u0011g`\u0001\fCB\u0004XM\u001c3U_2{w\rF\u0004|\u0005+\u00129F!\u0019\t\u000f\tE\"\u00071\u0001\u00034!9!\u0011\f\u001aA\u0002\tm\u0013a\u00037fC\u0012,'/\u00129pG\"\u00042A\u0013B/\u0013\r\u0011yf\u0013\u0002\u0004\u0013:$\bb\u0002B2e\u0001\u0007!1L\u0001\u0006G>,h\u000e^\u0001\u0012CB\u0004XM\u001c3U_B\u000b'\u000f^5uS>tGC\u0002B5\u0005o\u0012\t\t\u0005\u0003\u0003l\tMTB\u0001B7\u0015\u0011\u0011yG!\u001d\u0002\rI,7m\u001c:e\u0015\ty\u0017/\u0003\u0003\u0003v\t5$!D'f[>\u0014\u0018PU3d_J$7\u000fC\u0004\u0003zM\u0002\rAa\u001f\u0002\u0013A\f'\u000f^5uS>t\u0007cA!\u0003~%\u0019!q\u0010\u001e\u0003\u0013A\u000b'\u000f^5uS>t\u0007b\u0002B2g\u0001\u0007!1L\u0001\u0017M\u0016$8\r[!t\u0007\u0006,x\r\u001b;VaJ+\u0007\u000f\\5dCRA!q\u0011BM\u00057\u0013)\u000b\u0005\u0003\u0003\n\nUUB\u0001BF\u0015\u0011\u0011\tD!$\u000b\t\t=%\u0011S\u0001\nS:$XM\u001d8bYNT1Aa%r\u0003\u001d\u0019Ho\u001c:bO\u0016LAAa&\u0003\f\nYAj\\4SK\u0006$\u0017J\u001c4p\u0011\u001d\u0011I\b\u000ea\u0001\u0005wBqA!(5\u0001\u0004\u0011y*\u0001\u0007sKBd\u0017nY1Fa>\u001c\u0007\u000eE\u0002K\u0005CK1Aa)L\u0005\u0011auN\\4\t\u000f\t\u001dF\u00071\u0001\u0003 \u0006!\"/\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012\f!$Y:tKJ$(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012$Ra\u001fBW\u0005cCqAa,6\u0001\u0004\u0011Y&A\u0005sKBd\u0017nY1JI\"9!qU\u001bA\u0002\tM\u0006\u0003\u0002&N\u0005?\u000bA$Y:tKJ$(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u001b>$W\rF\u0003|\u0005s\u0013Y\fC\u0004\u00030Z\u0002\rAa\u0017\t\u000f\tuf\u00071\u0001\u0003@\u0006y!/\u001a9mS\u000e\fG/[8o\u001b>$W\r\u0005\u0003\u0003B\n\u001dg\u0002BAb\u0005\u0007LAA!2\u0002F\u0006\u0001\"+\u001a9mS\u000e\fG/[8o'R\fG/Z\u0005\u0005\u0005\u0013\u0014YM\u0001\u0003N_\u0012,'\u0002\u0002Bc\u0003\u000b\fq#Y2uSZ,\u0007+^:i'\u0016\u001c8/[8o\u001fJ4\u0015-\u001b7\u0015\r\tE'q\u001bBm!\u0011\t\u0019Ma5\n\t\tU\u0017Q\u0019\u0002\f!V\u001c\bnU3tg&|g\u000eC\u0004\u00030^\u0002\rAa\u0017\t\u000f\t\u001dv\u00071\u0001\u0003 \u00061b/\u001a:jMf\u0004Vo\u001d5TKN\u001c\u0018n\u001c8F]\u0012,G\rF\u0004|\u0005?\u0014\tOa9\t\u000f\t=\u0006\b1\u0001\u0003\\!9!q\u0015\u001dA\u0002\t}\u0005b\u0002Bsq\u0001\u0007\u0011qG\u0001\u0015g\"|W\u000f\u001c3TK:$WI\u001c3TKN\u001c\u0018n\u001c8")
/* loaded from: input_file:kafka/cluster/PushReplicationPartitionTest.class */
public class PushReplicationPartitionTest extends AbstractPartitionTest {
    private final KRaftMetadataCache metadataCache = (KRaftMetadataCache) Mockito.mock(KRaftMetadataCache.class);
    private final TopicIdPartition topicIdPartition = new TopicIdPartition(topicId(), topicPartition().partition());

    @Override // kafka.cluster.AbstractPartitionTest
    public Option<TestUtils.MockPushManager> createPushReplicationManager() {
        return new Some(new TestUtils.MockPushManager(false));
    }

    @Override // kafka.cluster.AbstractPartitionTest
    /* renamed from: metadataCache, reason: merged with bridge method [inline-methods] */
    public KRaftMetadataCache mo67metadataCache() {
        return this.metadataCache;
    }

    public TopicIdPartition topicIdPartition() {
        return this.topicIdPartition;
    }

    @Test
    public void testMakeLeaderResetsReplicationSessionOnNewLeaderEpoch() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Assertions.assertTrue(partition().getReplica(remoteReplicaId()).isDefined());
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
        Partition partition2 = partition();
        Assertions.assertFalse(partition2.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(3).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition2.makeLeader$default$4()), "Expected become leader transition not to change leader");
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testMakeLeaderEndsPushSessionsWhenFencingReplicas() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenReplicasShutDown() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().isBrokerShuttingDown(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenPromotingLinkLeader() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setClusterLinkId(clusterLinkId().toString()).setClusterLinkTopicState(TopicLinkMirror$.MODULE$.name()).setLinkedLeaderEpoch(-1).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        fetchAsCaughtUpReplica(partition(), 1L, 3L);
    }

    @Test
    public void testFetchAfterPushReplicationDisabledAndBrokerEpochBumped() {
        long j = 1;
        long j2 = 2;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, j2);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setClusterLinkId(clusterLinkId().toString()).setClusterLinkTopicState(TopicLinkMirror$.MODULE$.name()).setLinkedLeaderEpoch(-1).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        long j3 = 1 + 1;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(j3)));
        Assertions.assertThrows(NotLeaderOrFollowerException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), this.partition().localLogOrException().logEndOffset(), this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(this.partition().getLeaderEpoch())), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j)), j2);
        });
        fetchFollower(partition(), remoteReplicaId(), partition().localLogOrException().logEndOffset(), fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(partition().getLeaderEpoch())), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j3)), 0L);
        Assertions.assertEquals(new Some(BoxesRunTime.boxToLong(j3)), ((Replica) partition().getReplica(remoteReplicaId()).get()).stateSnapshot().brokerEpoch());
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        long j4 = j3 + 2;
        fetchFollower(partition(), remoteReplicaId(), partition().localLogOrException().logEndOffset(), fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(partition().getLeaderEpoch())), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j4)), 1L);
        Assertions.assertEquals(new Some(BoxesRunTime.boxToLong(j4)), ((Replica) partition().getReplica(remoteReplicaId()).get()).stateSnapshot().brokerEpoch());
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
    }

    @Test
    public void testAlterPartitionIsrShrinkEndsPushSession() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, 5, false, new SimpleRecord[]{new SimpleRecord(Long.toString(time().milliseconds()).getBytes())});
        AppendOrigin appendOrigin = AppendOrigin.CLIENT;
        partition.appendRecordsToLeader(withRecords, AppendOrigin.CLIENT, 0, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testDeletePartitionEndsPushSessions() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().delete();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMarkPartitionOfflineEndsPushSessions() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().markOffline();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testConcurrentMakeLeaderWithCaughtUpFetch(boolean z) {
        int i = 5;
        setupPartitionWithMocks(5, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 0L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Future submit2 = executor().submit(() -> {
            return BoxesRunTime.boxToBoolean($anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(this, semaphore, z, i));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        Assertions.assertNotEquals(BoxesRunTime.boxToBoolean(z), submit2.get(15L, TimeUnit.SECONDS), "Unexpected leadership change");
        Throwable cause = Assertions.assertThrows(Exception.class, () -> {
            submit.get(15L, TimeUnit.SECONDS);
        }).getCause();
        Assertions.assertTrue((cause instanceof FencedLeaderEpochException) || (cause instanceof PushReplicationStartedException));
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        if (!(cause instanceof FencedLeaderEpochException)) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        }
        assertReplicationSessionId(remoteReplicaId(), z ? new Some(BoxesRunTime.boxToLong(-1L)) : None$.MODULE$);
    }

    @Test
    public void testMakeFollowerEndsReplicationSession() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
        Partition partition = partition();
        Assertions.assertTrue(partition.makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeFollower$default$4()));
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43 */
    /* JADX WARN: Type inference failed for: r0v60, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaEpochFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(2L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v42 */
    /* JADX WARN: Type inference failed for: r0v59, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaSessionFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 3L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(3L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    @Test
    public void testFetchTransitionsToPushWhenFullyCaughtUp() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        int i = 10;
        long j = 0;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertEquals(0, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(0, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.highWatermark());
        Assertions.assertEquals(23L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 20L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertEquals(0, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(26L, orCreateLog.logEndOffset());
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition(), remoteReplicaId(), 23L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager3 -> {
            return mockPushManager3.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertEquals(1, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 26L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager4 -> {
            return mockPushManager4.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertEquals(1, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
    }

    @Test
    public void testFetchTransitionsToPushOnlyWithDefinedTopicId() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(brokerId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
        Partition partition2 = partition();
        Assertions.assertFalse(partition2.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition2.makeLeader$default$4()), "Partition leader ID should not change");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @ValueSource(strings = {"__consumer_offsets", "__transaction_state", "_confluent-tier-state", "_confluent-link-metadata", "_confluent-quotas"})
    @ParameterizedTest
    public void testFetchDoesNotTransitionInternalTopicsToPushByDefault(String str) {
        setupInternalTopicPartition(str, pushReplicationManager());
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
    }

    @ValueSource(strings = {"__consumer_offsets", "__transaction_state", "_confluent-tier-state", "_confluent-link-metadata", "_confluent-quotas"})
    @ParameterizedTest
    public void testFetchTransitionsInternalTopicsToPushWhenEnabled(String str) {
        setupInternalTopicPartition(str, new Some(new TestUtils.MockPushManager(true)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    private void setupInternalTopicPartition(String str, Option<PushManager> option) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        configRepository().setTopicConfig("_confluent-tier-state", "retention.ms", Integer.toString(-1));
        MetadataVersion interBrokerProtocolVersion = interBrokerProtocolVersion();
        int brokerId = brokerId();
        JFunction0.mcJ.sp spVar = () -> {
            return this.defaultBrokerEpoch(this.brokerId());
        };
        MockTime time = time();
        AbstractPartitionTest.MockAlterPartitionListener alterPartitionListener = alterPartitionListener();
        DelayedOperations delayedOperations = delayedOperations();
        KRaftMetadataCache mo67metadataCache = mo67metadataCache();
        LogManager logManager = logManager();
        Some some = new Some(tierReplicaManager());
        None$ none$ = None$.MODULE$;
        TestUtils.MockAlterPartitionManager alterPartitionManager = alterPartitionManager();
        None$ none$2 = None$.MODULE$;
        ListenerName interBrokerListenerName = interBrokerListenerName();
        Partition$ partition$ = Partition$.MODULE$;
        None$ none$3 = None$.MODULE$;
        Partition$ partition$2 = Partition$.MODULE$;
        Partition$ partition$3 = Partition$.MODULE$;
        None$ none$4 = None$.MODULE$;
        Partition$ partition$4 = Partition$.MODULE$;
        partition_$eq(new Partition(topicPartition, 30000L, interBrokerProtocolVersion, interBrokerListenerName, brokerId, spVar, time, alterPartitionListener, delayedOperations, mo67metadataCache, logManager, some, none$, none$3, alterPartitionManager, none$2, false, none$4, option, None$.MODULE$));
        Mockito.when(offsetCheckpoints().fetch(ArgumentMatchers.anyString(), (TopicPartition) ArgumentMatchers.eq(topicPartition))).thenReturn(None$.MODULE$);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
    }

    @Test
    public void testFetchTransitionsToPushWhenInCommittedIsr() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5()));
        int i = 10;
        long j = 0;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().assertInFlightLeaderAndIsr((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), 10, 1);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
    }

    @Test
    public void testFetchWithInvalidSessionIdDoesNotTransitionToPush() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchAsCaughtUpReplica(partition(), 1L, -1L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdDoesNotTransitionToPush() {
        long j = 1;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
    }

    @Test
    public void testFetchWithoutBrokerNodeDoesNotTransitionToPush() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(Option$.MODULE$.empty());
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdTransitionsToPullAndCanNeverTransitionBackToPush() {
        long j = 1;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), remoteReplicaId()), 0L, false);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithDivergingEpochDoesNotTransitionToPull() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5()));
        int i = 10;
        long j = 0;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = setupPartitionWithMocks(10, true);
        read$1(2, 5L, true, partition, 0L, 10);
        read$1(0, 4L, true, partition, 0L, 10);
        read$1(6, 6L, true, partition, 0L, 10);
        read$1(8, 17L, true, partition, 0L, 10);
        read$1(10, 18L, true, partition, 0L, 10);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.read$1(9, 17L, false, partition, j, i);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithNewReplicaEpochTransitionsToPull() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(5L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        fetchAsCaughtUpReplica(partition(), 5L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        Assertions.assertThrows(NotLeaderOrFollowerException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @Test
    public void testFetchWithNewReplicaEpochUpdatesReplicationSessionId() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 2L, 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 1L);
        });
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 3L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
    }

    @Test
    public void testFetchWithNewReplicationSessionTransitionsToPull() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        activePushSessionOrFail(remoteReplicaId(), 1L);
    }

    @Test
    public void testFollowerFetchAfterTransitionThrowsPushReplicationStartedException() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
    }

    @Test
    public void testFollowerFetchWithTransitionUpdatesFetchStateAndIncrementsHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        Assertions.assertEquals(0L, orCreateLog.highWatermark());
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(5L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(partition, 1L, 0L);
        });
        Assertions.assertEquals(orCreateLog.logEndOffset(), orCreateLog.highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), orCreateLog.highWatermark());
        Assertions.assertEquals(1, ((TestUtils.MockPushManager) pushReplicationManager().get()).followersNotCatchingUp());
    }

    @Test
    public void testUpdateFollowerFetchStateFencesStaleFetches() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        long j = 3;
        long j2 = 4;
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(3L)), 4L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(4L)));
        Assertions.assertThrows(FencedLeaderEpochException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(9)), j, j2, partition, orCreateLog);
        });
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(10)), j, j2 - 1, partition, orCreateLog);
        });
    }

    @Test
    public void testOnPushSessionEndedForStaleSession() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 1L);
        fetchAsCaughtUpReplica(partition(), 1L, 2L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        activePushSessionOrFail2.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        PushSession activePushSessionOrFail3 = activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader not to change leadership");
        activePushSessionOrFail3.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testOnPushSessionEndedUpdatesReplicationSession() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        assertReplicationSessionMode(remoteReplicaId(), ReplicationState.Mode.PULL);
    }

    @Test
    public void testOnPushSessionEndedWhenPartitionDeleted() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        partition().delete();
        activePushSessionOrFail.onPushSessionEnded();
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertEquals(0L, partition().lowWatermarkIfLeader());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        activePushSessionOrFail.onAppendRecordsResponse(orCreateLog.logEndOffset(), 3L);
        Assertions.assertTrue(alterPartitionManager().isrUpdates().isEmpty());
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesLowWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        LogDeleteRecordsResult deleteRecordsOnLeader = partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(5L, deleteRecordsOnLeader.requestedOffset());
        Assertions.assertEquals(0L, deleteRecordsOnLeader.lowWatermark());
        activePushSessionOrFail.onAppendRecordsResponse(17L, 5L);
        Assertions.assertEquals(5L, partition().lowWatermarkIfLeader());
    }

    @Test
    public void testTryCompleteDelayedRequestsCompletesPurgatory() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(17L, 0L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.never())).checkAndCompleteAll();
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(0))).checkAndCompleteAll();
        Mockito.reset(new DelayedOperations[]{delayedOperations()});
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(0))).checkAndCompleteAll();
        Mockito.when(BoxesRunTime.boxToInteger(delayedOperations().numDelayedDelete())).thenReturn(BoxesRunTime.boxToInteger(1));
        partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(3L, partition().lowWatermarkIfLeader());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 5L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(0))).checkAndCompleteAll();
        activePushSessionOrFail.tryCompleteDelayedRequests();
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(1))).checkAndCompleteAll();
    }

    @Test
    public void testOnAppendRecordsResponseWithStalePushSession() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 0L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(27L, orCreateLog.logEndOffset());
        activePushSessionOrFail2.onAppendRecordsResponse(23L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader not to change leadership");
        activePushSessionOrFail2.onAppendRecordsResponse(25L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
    }

    @Test
    public void testOnAppendRecordsResponseWithDeletedPartition() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Mockito.when(BoxesRunTime.boxToBoolean(mo67metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        partition().delete();
        activePushSessionOrFail.onAppendRecordsResponse(22L, 0L);
    }

    private void seedLogWithData(AbstractLog abstractLog) {
        appendToLog(abstractLog, 0, 2);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 4, 5);
        appendToLog(abstractLog, 7, 1);
        appendToLog(abstractLog, 9, 3);
        Assertions.assertEquals(17L, abstractLog.logEndOffset());
    }

    @Test
    public void testLeaderLogAppendListenerNotifiesPushReplicasForLeaderAppends() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        AbstractLog localLogOrException = partition().localLogOrException();
        appendToPartition(partition(), 5);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(5, localLogOrException.logEndOffset());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 5, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        int i = 5 + 5;
        Assertions.assertEquals(i, localLogOrException.logEndOffset());
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId)).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        fetchFollower(partition(), remoteReplicaId, 3L, fetchFollower$default$4(), fetchFollower$default$5(), fetchFollower$default$6(), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), fetchFollower$default$10(), fetchFollower$default$11());
        Assertions.assertEquals(3L, partition().localLogOrException().highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), i, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(i + 5, localLogOrException.logEndOffset());
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        appendToPartition(partition(), 5);
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(this, BoxesRunTime.unboxToInt(obj));
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderLsoEvent(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderLsoEvents(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForHighWatermarkIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L);
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(partition().localLogOrException().highWatermark(), 0L);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            activePushSessionOrFail.onAppendRecordsResponse(i, this.partition().logStartOffset());
            Assertions.assertEquals(i, this.partition().localLogOrException().highWatermark());
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i2 -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderHwmEvent(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i2);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderHwmEvents(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderLogAppendListenerIgnoresFollowerEvents() {
        setupPartitionWithMocks(5, false);
        partition().appendRecordsToFollower(0L, AppendOrigin.REPLICATION, -1L, Optional.of(Predef$.MODULE$.int2Integer(5)), MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, 5, false, (SimpleRecord[]) RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(BoxesRunTime.unboxToInt(obj));
        }).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), 3L, Optional.empty());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderHwmEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        partition().localLogOrException().maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.LeaderOffsetIncremented);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderLsoEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
    }

    @Test
    public void testLeaderLogAppendListenerIncludesTransactionMarkerAppends() {
        Mockito.when(mo67metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo67metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        AbstractLog localLogOrException = partition().localLogOrException();
        VerificationGuard maybeStartTransactionVerification = partition().maybeStartTransactionVerification(1L, 0, (short) 0);
        AbstractRecords createTransactionalRecords = createTransactionalRecords(new $colon.colon(new SimpleRecord("k1".getBytes(), "v1".getBytes()), Nil$.MODULE$), 0L, createTransactionalRecords$default$3(), 1L);
        partition().appendRecordsToLeader(createTransactionalRecords, AppendOrigin.CLIENT, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), time().milliseconds(), maybeStartTransactionVerification);
        int size = CollectionConverters$.MODULE$.IterableHasAsScala(createTransactionalRecords.records()).asScala().size();
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L, createTransactionalRecords);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size, localLogOrException.logEndOffset());
        AbstractRecords withEndTransactionMarker = MemoryRecords.withEndTransactionMarker(1L, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        Partition partition2 = partition();
        AppendOrigin appendOrigin = AppendOrigin.COORDINATOR;
        partition2.appendRecordsToLeader(withEndTransactionMarker, AppendOrigin.COORDINATOR, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition2.appendRecordsToLeader$default$5(), partition2.appendRecordsToLeader$default$6());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), size, withEndTransactionMarker);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size + CollectionConverters$.MODULE$.IterableHasAsScala(withEndTransactionMarker.records()).asScala().size(), localLogOrException.logEndOffset());
    }

    private void appendToLog(AbstractLog abstractLog, int i, int i2) {
        IndexedSeq map = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i2).map(obj -> {
            return $anonfun$appendToLog$1(BoxesRunTime.unboxToInt(obj));
        });
        abstractLog.appendAsLeader(MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, i, false, (SimpleRecord[]) map.toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), i, abstractLog.appendAsLeader$default$3(), abstractLog.appendAsLeader$default$4(), abstractLog.appendAsLeader$default$5(), abstractLog.appendAsLeader$default$6());
    }

    private MemoryRecords appendToPartition(Partition partition, int i) {
        IndexedSeq map = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return $anonfun$appendToPartition$1(BoxesRunTime.unboxToInt(obj));
        });
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, (SimpleRecord[]) map.toArray(ClassTag$.MODULE$.apply(SimpleRecord.class)));
        partition.appendRecordsToLeader(withRecords, AppendOrigin.CLIENT, 1, RequestLocal$.MODULE$.NoCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        return withRecords;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogReadInfo fetchAsCaughtUpReplica(Partition partition, long j, long j2) {
        AbstractLog localLogOrException = partition.localLogOrException();
        return fetchFollower(partition, remoteReplicaId(), localLogOrException.logEndOffset(), fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(partition.getLeaderEpoch())), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j)), j2);
    }

    private void assertReplicationSessionId(int i, Option<Object> option) {
        Assertions.assertEquals(option, partition().getReplica(i).map(replica -> {
            return BoxesRunTime.boxToLong($anonfun$assertReplicationSessionId$1(replica));
        }));
    }

    private void assertReplicationSessionMode(int i, ReplicationState.Mode mode) {
        Assertions.assertEquals(mode, ((Replica) partition().getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to exist");
        })).stateSnapshot().replicationSessionState().mode());
    }

    private PushSession activePushSessionOrFail(int i, long j) {
        Assertions.assertTrue(partition().remotePushReplicas().contains(Predef$.MODULE$.int2Integer(i)));
        return ((TestUtils.MockPushManager) pushReplicationManager().get()).activePushSessionOrFail(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), i), j);
    }

    private void verifyPushSessionEnded(int i, long j, boolean z) {
        Assertions.assertFalse(partition().remotePushReplicas().contains(Predef$.MODULE$.int2Integer(i)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), remoteReplicaId()), j, z);
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(PushReplicationPartitionTest pushReplicationPartitionTest, Semaphore semaphore, boolean z, int i) {
        semaphore.acquire();
        if (z) {
            Partition partition = pushReplicationPartitionTest.partition();
            return partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.brokerId()).setLeaderEpoch(i + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId()), partition.makeLeader$default$4());
        }
        Partition partition2 = pushReplicationPartitionTest.partition();
        return partition2.makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.remoteReplicaId()).setLeaderEpoch(i + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId()), partition2.makeFollower$default$4());
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final LogReadInfo read$1(int i, long j, boolean z, Partition partition, long j2, int i2) {
        int remoteReplicaId = remoteReplicaId();
        Some some = new Some(BoxesRunTime.boxToInteger(i2));
        Some some2 = new Some(BoxesRunTime.boxToLong(1L));
        LogReadInfo fetchFollower = fetchFollower(partition, remoteReplicaId, j, j2, fetchFollower$default$5(), some, new Some(BoxesRunTime.boxToInteger(i)), fetchFollower$default$8(), fetchFollower$default$9(), some2, 0L);
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(fetchFollower.divergingEpoch.isPresent()));
        return fetchFollower;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void updateFetchState$1(int i, Optional optional, long j, long j2, Partition partition, AbstractLog abstractLog) {
        partition.updateFollowerFetchState((Replica) partition.getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to be defined");
        }), new LogOffsetMetadata(7L), 0L, time().milliseconds(), abstractLog.logEndOffset(), j, optional, j2);
    }

    public static final /* synthetic */ LogDeleteRecordsResult $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(PushReplicationPartitionTest pushReplicationPartitionTest, int i) {
        return pushReplicationPartitionTest.partition().deleteRecordsOnLeader(i);
    }

    public static final /* synthetic */ SimpleRecord $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToLog$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToPartition$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ long $anonfun$assertReplicationSessionId$1(Replica replica) {
        return replica.stateSnapshot().replicationSessionState().replicationSessionId();
    }
}
