package kafka.server.link;

import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.LogCaptureAppender;
import kafka.utils.LogCaptureAppender$;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkDestConnectionManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\tmg\u0001\u0002\u001f>\u0001\u0011CQa\u0013\u0001\u0005\u00021Cqa\u0014\u0001C\u0002\u0013%\u0001\u000b\u0003\u0004V\u0001\u0001\u0006I!\u0015\u0005\b-\u0002\u0011\r\u0011\"\u0003X\u0011\u0019\u0001\u0007\u0001)A\u00051\"9\u0011\r\u0001b\u0001\n\u0013\u0011\u0007B\u00028\u0001A\u0003%1\rC\u0004p\u0001\t\u0007I\u0011B,\t\rA\u0004\u0001\u0015!\u0003Y\u0011\u001d\t\bA1A\u0005\n]CaA\u001d\u0001!\u0002\u0013A\u0006bB:\u0001\u0005\u0004%I\u0001\u001e\u0005\u0007w\u0002\u0001\u000b\u0011B;\t\u000fq\u0004!\u0019!C\u0005{\"9\u0011\u0011\u0002\u0001!\u0002\u0013q\b\"CA\u0006\u0001\t\u0007I\u0011BA\u0007\u0011!\t)\u0002\u0001Q\u0001\n\u0005=\u0001\"CA\f\u0001\t\u0007I\u0011BA\r\u0011!\t\t\u0003\u0001Q\u0001\n\u0005m\u0001\"CA\u0012\u0001\t\u0007I\u0011BA\u0013\u0011!\ti\u0003\u0001Q\u0001\n\u0005\u001d\u0002\"CA\u0018\u0001\t\u0007I\u0011BA\u0019\u0011!\ty\u0004\u0001Q\u0001\n\u0005M\u0002\"CA!\u0001\t\u0007I\u0011BA\"\u0011!\t)\u0006\u0001Q\u0001\n\u0005\u0015\u0003\"CA,\u0001\t\u0007I\u0011BA-\u0011!\t)\u0007\u0001Q\u0001\n\u0005m\u0003\"CA4\u0001\t\u0007I\u0011BA5\u0011!\t\t\t\u0001Q\u0001\n\u0005-\u0004bCAB\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u000bC1\"!$\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u0010\"Y\u00111\u0014\u0001A\u0002\u0003\u0005\u000b\u0015BAD\u0011-\ti\n\u0001a\u0001\u0002\u0004%I!a(\t\u0017\u0005\u001d\u0006\u00011AA\u0002\u0013%\u0011\u0011\u0016\u0005\f\u0003[\u0003\u0001\u0019!A!B\u0013\t\t\u000bC\u0006\u00020\u0002\u0001\r\u00111A\u0005\n\u0005E\u0006bCA]\u0001\u0001\u0007\t\u0019!C\u0005\u0003wC1\"a0\u0001\u0001\u0004\u0005\t\u0015)\u0003\u00024\"Y\u0011\u0011\u0019\u0001A\u0002\u0003\u0007I\u0011BAb\u0011-\t)\r\u0001a\u0001\u0002\u0004%I!a2\t\u0017\u0005-\u0007\u00011A\u0001B\u0003&\u00111\u0010\u0005\f\u0003\u001b\u0004\u0001\u0019!a\u0001\n\u0013\ty\rC\u0006\u0002Z\u0002\u0001\r\u00111A\u0005\n\u0005m\u0007bCAp\u0001\u0001\u0007\t\u0011)Q\u0005\u0003#Dq!!9\u0001\t\u0003\t\u0019\u000fC\u0004\u0002|\u0002!\t!a9\t\u000f\t\u0015\u0001\u0001\"\u0001\u0002d\"9!q\u0002\u0001\u0005\u0002\u0005\r\bb\u0002B\n\u0001\u0011\u0005\u00111\u001d\u0005\b\u0005/\u0001A\u0011AAr\u0011\u001d\u0011Y\u0002\u0001C\u0001\u0003GDqAa\b\u0001\t\u0003\t\u0019\u000fC\u0004\u0003$\u0001!IA!\n\t\u000f\t\r\u0005\u0001\"\u0003\u0003\u0006\"9!q\u0011\u0001\u0005\n\t%\u0005b\u0002BK\u0001\u0011%!q\u0013\u0005\b\u00053\u0003A\u0011\u0002BN\u0011\u001d\u0011)\u000b\u0001C\u0005\u0005OC\u0011Ba1\u0001#\u0003%IA!2\u0003I\rcWo\u001d;fe2Kgn\u001b#fgR\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d+fgRT!AP \u0002\t1Lgn\u001b\u0006\u0003\u0001\u0006\u000baa]3sm\u0016\u0014(\"\u0001\"\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001!\u0012\t\u0003\r&k\u0011a\u0012\u0006\u0002\u0011\u0006)1oY1mC&\u0011!j\u0012\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005i\u0005C\u0001(\u0001\u001b\u0005i\u0014\u0001\u00042s_.,'oQ8oM&<W#A)\u0011\u0005I\u001bV\"A \n\u0005Q{$aC&bM.\f7i\u001c8gS\u001e\fQB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u0004\u0013\u0001\u00037j].t\u0015-\\3\u0016\u0003a\u0003\"!\u00170\u000e\u0003iS!a\u0017/\u0002\t1\fgn\u001a\u0006\u0002;\u0006!!.\u0019<b\u0013\ty&L\u0001\u0004TiJLgnZ\u0001\nY&t7NT1nK\u0002\na\u0001\\5oW&#W#A2\u0011\u0005\u0011dW\"A3\u000b\u0005\u0019<\u0017AB2p[6|gN\u0003\u0002CQ*\u0011\u0011N[\u0001\u0007CB\f7\r[3\u000b\u0003-\f1a\u001c:h\u0013\tiWM\u0001\u0003Vk&$\u0017a\u00027j].LE\rI\u0001\u0010g>,(oY3DYV\u001cH/\u001a:JI\u0006\u00012o\\;sG\u0016\u001cE.^:uKJLE\rI\u0001\u000eI\u0016\u001cHo\u00117vgR,'/\u00133\u0002\u001d\u0011,7\u000f^\"mkN$XM]%eA\u0005AA.\u001b8l\t\u0006$\u0018-F\u0001v!\t1\u00180D\u0001x\u0015\tA\u0018)\u0001\u0002{W&\u0011!p\u001e\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uC\u0006IA.\u001b8l\t\u0006$\u0018\rI\u0001\nY&t7\u000e\u0015:paN,\u0012A \t\u0004\u007f\u0006\u0015QBAA\u0001\u0015\r\t\u0019\u0001X\u0001\u0005kRLG.\u0003\u0003\u0002\b\u0005\u0005!A\u0003)s_B,'\u000f^5fg\u0006QA.\u001b8l!J|\u0007o\u001d\u0011\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ,\"!a\u0004\u0011\u00079\u000b\t\"C\u0002\u0002\u0014u\u0012!d\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0001#\\3uC\u0012\fG/Y'b]\u0006<WM\u001d\u0011\u0002\u0013M\u001c\u0007.\u001a3vY\u0016\u0014XCAA\u000e!\rq\u0015QD\u0005\u0004\u0003?i$\u0001F\"mkN$XM\u001d'j].\u001c6\r[3ek2,'/\u0001\u0006tG\",G-\u001e7fe\u0002\n1\u0002\\5oW6\u000bg.Y4feV\u0011\u0011q\u0005\t\u0004\u001d\u0006%\u0012bAA\u0016{\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s\u00031a\u0017N\\6NC:\fw-\u001a:!\u00035qW\r^<pe.\u001cE.[3oiV\u0011\u00111\u0007\t\u0005\u0003k\tY$\u0004\u0002\u00028)\u0019\u0011\u0011H4\u0002\u000f\rd\u0017.\u001a8ug&!\u0011QHA\u001c\u00055qU\r^<pe.\u001cE.[3oi\u0006qa.\u001a;x_J\\7\t\\5f]R\u0004\u0013\u0001F1e[&tW*\u001a;bI\u0006$\u0018-T1oC\u001e,'/\u0006\u0002\u0002FA!\u0011qIA)\u001b\t\tIE\u0003\u0003\u0002L\u00055\u0013!C5oi\u0016\u0014h.\u00197t\u0015\u0011\ty%a\u000e\u0002\u000b\u0005$W.\u001b8\n\t\u0005M\u0013\u0011\n\u0002\u0015\u0003\u0012l\u0017N\\'fi\u0006$\u0017\r^1NC:\fw-\u001a:\u0002+\u0005$W.\u001b8NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3sA\u00059Q.\u001a;sS\u000e\u001cXCAA.!\u0011\ti&!\u0019\u000e\u0005\u0005}#bAA,K&!\u00111MA0\u0005\u001diU\r\u001e:jGN\f\u0001\"\\3ue&\u001c7\u000fI\u0001\u000eG2|7/\u001a3DY&,g\u000e^:\u0016\u0005\u0005-\u0004CBA7\u0003o\nY(\u0004\u0002\u0002p)!\u0011\u0011OA:\u0003\u001diW\u000f^1cY\u0016T1!!\u001eH\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003s\nyGA\u0006BeJ\f\u0017PQ;gM\u0016\u0014\bc\u0001(\u0002~%\u0019\u0011qP\u001f\u0003-\rcWo\u001d;fe2Kgn[!e[&t7\t\\5f]R\fab\u00197pg\u0016$7\t\\5f]R\u001c\b%\u0001\u0006mS:\\7i\u001c8gS\u001e,\"!a\"\u0011\u00079\u000bI)C\u0002\u0002\fv\u0012\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u00039a\u0017N\\6D_:4\u0017nZ0%KF$B!!%\u0002\u0018B\u0019a)a%\n\u0007\u0005UuI\u0001\u0003V]&$\b\"CAM?\u0005\u0005\t\u0019AAD\u0003\rAH%M\u0001\fY&t7nQ8oM&<\u0007%A\u0006mS:\\W*\u001a;sS\u000e\u001cXCAAQ!\rq\u00151U\u0005\u0004\u0003Kk$AE\"mkN$XM\u001d'j].lU\r\u001e:jGN\fq\u0002\\5oW6+GO]5dg~#S-\u001d\u000b\u0005\u0003#\u000bY\u000bC\u0005\u0002\u001a\n\n\t\u00111\u0001\u0002\"\u0006aA.\u001b8l\u001b\u0016$(/[2tA\u0005Y1m\u001c8o\u001b\u0006t\u0017mZ3s+\t\t\u0019\fE\u0002O\u0003kK1!a.>\u0005\u0001\u001aE.^:uKJd\u0015N\\6EKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0002\u001f\r|gN\\'b]\u0006<WM]0%KF$B!!%\u0002>\"I\u0011\u0011T\u0013\u0002\u0002\u0003\u0007\u00111W\u0001\rG>tg.T1oC\u001e,'\u000fI\u0001\fe\u0016lw\u000e^3BI6Lg.\u0006\u0002\u0002|\u0005y!/Z7pi\u0016\fE-\\5o?\u0012*\u0017\u000f\u0006\u0003\u0002\u0012\u0006%\u0007\"CAMQ\u0005\u0005\t\u0019AA>\u00031\u0011X-\\8uK\u0006#W.\u001b8!\u0003)awnY1m\u0003\u0012l\u0017N\\\u000b\u0003\u0003#\u0004B!a5\u0002V6\u0011\u0011QJ\u0005\u0005\u0003/\fiE\u0001\bD_:4G.^3oi\u0006#W.\u001b8\u0002\u001d1|7-\u00197BI6Lgn\u0018\u0013fcR!\u0011\u0011SAo\u0011%\tIjKA\u0001\u0002\u0004\t\t.A\u0006m_\u000e\fG.\u00113nS:\u0004\u0013!B:fiV\u0003HCAAIQ\ri\u0013q\u001d\t\u0005\u0003S\f90\u0004\u0002\u0002l*!\u0011Q^Ax\u0003\r\t\u0007/\u001b\u0006\u0005\u0003c\f\u00190A\u0004kkBLG/\u001a:\u000b\u0007\u0005U(.A\u0003kk:LG/\u0003\u0003\u0002z\u0006-(A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000eK\u0002/\u0003\u007f\u0004B!!;\u0003\u0002%!!1AAv\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u000euKN$8i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,w*\u001e;c_VtG\rK\u00020\u0005\u0013\u0001B!!;\u0003\f%!!QBAv\u0005\u0011!Vm\u001d;\u00023Q,7\u000f^\"p]:,7\r^5p]6{G-Z%oE>,h\u000e\u001a\u0015\u0004a\t%\u0011a\b;fgR\u0004VM]:jgR,g\u000e^\"p]:,7\r^5p]\u001a\u000b\u0017\u000e\\;sK\"\u001a\u0011G!\u0003\u0002\u001fQ,7\u000f\u001e*fG>tg-[4ve\u0016D3A\rB\u0005\u0003%!Xm\u001d;QCV\u001cX\rK\u00024\u0005\u0013\tA\u0002^3ti2{w\rT3wK2D3\u0001\u000eB\u0005\u000391XM]5gs2{w\rT3wK2$\u0002\"!%\u0003(\te#\u0011\u000e\u0005\b\u0005S)\u0004\u0019\u0001B\u0016\u0003\u0015\u0019G.\u0019>{a\u0011\u0011iCa\u0012\u0011\r\t=\"Q\bB\"\u001d\u0011\u0011\tD!\u000f\u0011\u0007\tMr)\u0004\u0002\u00036)\u0019!qG\"\u0002\rq\u0012xn\u001c;?\u0013\r\u0011YdR\u0001\u0007!J,G-\u001a4\n\t\t}\"\u0011\t\u0002\u0006\u00072\f7o\u001d\u0006\u0004\u0005w9\u0005\u0003\u0002B#\u0005\u000fb\u0001\u0001\u0002\u0007\u0003J\t\u001d\u0012\u0011!A\u0001\u0006\u0003\u0011YEA\u0002`IE\nBA!\u0014\u0003TA\u0019aIa\u0014\n\u0007\tEsIA\u0004O_RD\u0017N\\4\u0011\u0007\u0019\u0013)&C\u0002\u0003X\u001d\u00131!\u00118z\u0011\u001d\u0011Y&\u000ea\u0001\u0005;\n\u0001\"\u00199qK:$WM\u001d\t\u0005\u0005?\u0012)'\u0004\u0002\u0003b)\u0019!1M!\u0002\u000bU$\u0018\u000e\\:\n\t\t\u001d$\u0011\r\u0002\u0013\u0019><7)\u00199ukJ,\u0017\t\u001d9f]\u0012,'\u000fC\u0004\u0003lU\u0002\rA!\u001c\u0002\u00175\f\u0007\u0010T8h\u0019\u00164X\r\u001c\t\u0006\r\n=$1O\u0005\u0004\u0005c:%AB(qi&|g\u000e\u0005\u0003\u0003v\t}TB\u0001B<\u0015\u0011\u0011IHa\u001f\u0002\u000b\u00154XM\u001c;\u000b\u0007\tu$.A\u0003tY\u001a$$.\u0003\u0003\u0003\u0002\n]$!\u0002'fm\u0016d\u0017AE2sK\u0006$XM\u0011:pW\u0016\u00148i\u001c8gS\u001e$\u0012!U\u0001\u0017g\u0016$X\u000f]\"p]:,7\r^5p]6\u000bg.Y4feR!\u0011\u0011\u0013BF\u0011\u001d\u0011ii\u000ea\u0001\u0005\u001f\u000babY8o]\u0016\u001cG/[8o\u001b>$W\rE\u0002O\u0005#K1Aa%>\u00059\u0019uN\u001c8fGRLwN\\'pI\u0016\f\u0001c\u0019:fCR,Gj\\2bY\u0006#W.\u001b8\u0015\u0005\u0005E\u0017!E2sK\u0006$XMU3n_R,\u0017\tZ7j]R1\u00111\u0010BO\u0005CCqAa(:\u0001\u0004\t9)\u0001\u0004d_:4\u0017n\u001a\u0005\b\u0005GK\u0004\u0019AAZ\u0003\u001di\u0017M\\1hKJ\f1B]3wKJ\u001cXMT8eKR1!\u0011\u0016B[\u0005\u007f\u0003BAa+\u000326\u0011!Q\u0016\u0006\u0004\u0005_+\u0017a\u00028fi^|'o[\u0005\u0005\u0005g\u0013iKA\u0006SKZ,'o]3O_\u0012,\u0007b\u0002B\\u\u0001\u0007!\u0011X\u0001\u0007]>$W-\u00133\u0011\u0007\u0019\u0013Y,C\u0002\u0003>\u001e\u00131!\u00138u\u0011%\u0011\tM\u000fI\u0001\u0002\u0004\u0011I,A\u0005sKF,Xm\u001d;JI\u0006)\"/\u001a<feN,gj\u001c3fI\u0011,g-Y;mi\u0012\u0012TC\u0001BdU\u0011\u0011IL!3,\u0005\t-\u0007\u0003\u0002Bg\u0005/l!Aa4\u000b\t\tE'1[\u0001\nk:\u001c\u0007.Z2lK\u0012T1A!6H\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u00053\u0014yMA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
/* loaded from: input_file:kafka/server/link/ClusterLinkDestConnectionManagerTest.class */
public class ClusterLinkDestConnectionManagerTest {
    private final KafkaConfig brokerConfig = createBrokerConfig();
    private final String linkName = "testLink";
    private final Uuid linkId = Uuid.randomUuid();
    private final String sourceClusterId = "sourceCluster";
    private final String destClusterId = "destCluster";
    private final ClusterLinkData linkData = new ClusterLinkData(linkName(), linkId(), new Some(sourceClusterId()), None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager metadataManager = (ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class);
    private final ClusterLinkScheduler scheduler = new ClusterLinkScheduler();
    private final ClusterLinkManager linkManager = (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class);
    private final NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
    private final AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 100, 10000);
    private final Metrics metrics = new Metrics();
    private final ArrayBuffer<ClusterLinkAdminClient> closedClients = ArrayBuffer$.MODULE$.empty();
    private ClusterLinkConfig linkConfig;
    private ClusterLinkMetrics linkMetrics;
    private ClusterLinkDestConnectionManager connManager;
    private ClusterLinkAdminClient remoteAdmin;
    private ConfluentAdmin kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private String linkName() {
        return this.linkName;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    private String sourceClusterId() {
        return this.sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    private ClusterLinkData linkData() {
        return this.linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    private ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    private ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    private AdminMetadataManager adminMetadataManager() {
        return this.adminMetadataManager;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private ArrayBuffer<ClusterLinkAdminClient> closedClients() {
        return this.closedClients;
    }

    private ClusterLinkConfig linkConfig() {
        return this.linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.linkConfig = clusterLinkConfig;
    }

    private ClusterLinkMetrics linkMetrics() {
        return this.linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics clusterLinkMetrics) {
        this.linkMetrics = clusterLinkMetrics;
    }

    private ClusterLinkDestConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkDestConnectionManager clusterLinkDestConnectionManager) {
        this.connManager = clusterLinkDestConnectionManager;
    }

    private ClusterLinkAdminClient remoteAdmin() {
        return this.remoteAdmin;
    }

    private void remoteAdmin_$eq(ClusterLinkAdminClient clusterLinkAdminClient) {
        this.remoteAdmin = clusterLinkAdminClient;
    }

    private ConfluentAdmin localAdmin() {
        return this.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin confluentAdmin) {
        this.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin = confluentAdmin;
    }

    @BeforeEach
    public void setUp() {
        Mockito.when(BoxesRunTime.boxToBoolean(metadataManager().isLinkCoordinator(linkName(), false))).thenReturn(BoxesRunTime.boxToBoolean(true));
    }

    @AfterEach
    public void tearDown() {
        if (connManager() != null) {
            connManager().shutdown();
        }
        scheduler().shutdown();
        metrics().close();
    }

    @Test
    public void testConnectionModeOutbound() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$);
        Assertions.assertNull(remoteAdmin());
        connManager().startup();
        Assertions.assertNull(remoteAdmin());
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, this.reverseNode(1, -1));
        });
        Assertions.assertNull(localAdmin(), "Local admin client created unnecessarily for outbound dest connection manager");
        connManager().shutdown();
    }

    @Test
    public void testConnectionModeInbound() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        Assertions.assertNull(remoteAdmin());
        connManager().startup();
        Assertions.assertNotNull(remoteAdmin());
        Assertions.assertTrue(connManager().reverseConnectionProvider(networkClient(), new Some(adminMetadataManager()), "").nonEmpty());
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        connManager().processReverseConnection(kafkaChannel, reverseNode(1, -1));
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
        ReverseNode reverseNode = reverseNode(2, 5);
        Assertions.assertThrows(NetworkException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, reverseNode);
        });
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
        Assertions.assertNotNull(localAdmin(), "Local admin client not created for inbound dest connection manager");
        connManager().shutdown();
        Assertions.assertNull(localAdmin(), "Local admin client was not shutdown");
    }

    @Test
    public void testPersistentConnectionFailure() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        ((KafkaChannel) Mockito.doNothing().when(kafkaChannel)).close();
        ReverseNode reverseNode = reverseNode(1, -1);
        networkClient().reverseAndAdd((ReverseChannel) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new RuntimeException("Test exception")});
        Assertions.assertThrows(RuntimeException.class, () -> {
            this.connManager().processReverseConnection(kafkaChannel, reverseNode);
        });
        ((NetworkClient) Mockito.verify(networkClient())).reverseAndAdd((ReverseChannel) ArgumentMatchers.any());
    }

    @Test
    public void testReconfigure() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        ConfluentAdmin localAdmin = localAdmin();
        ClusterLinkAdminClient remoteAdmin = remoteAdmin();
        linkProps().setProperty("metadata.max.age.ms", "1000");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals(connManager().currentConfig().originals(), linkConfig().originals());
        Assertions.assertNotNull(remoteAdmin());
        Assertions.assertNotSame(remoteAdmin, remoteAdmin());
        Assertions.assertEquals(new $colon.colon(remoteAdmin, Nil$.MODULE$), closedClients().toSeq());
        Assertions.assertSame(localAdmin, localAdmin());
    }

    @Test
    public void testPause() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$);
        connManager().startup();
        ClusterLinkAdminClient remoteAdmin = remoteAdmin();
        linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals(connManager().currentConfig().originals(), linkConfig().originals());
        Assertions.assertEquals(new $colon.colon(remoteAdmin, Nil$.MODULE$), closedClients().toSeq());
        Assertions.assertNull(localAdmin(), "Local admin client was not shutdown");
        Assertions.assertNull(remoteAdmin(), "Remote admin client was not shutdown");
        linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        ClusterLinkConfig$ clusterLinkConfig$3 = ClusterLinkConfig$.MODULE$;
        Properties linkProps2 = linkProps();
        None$ none$2 = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$4 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$3.create(linkProps2, none$2, true));
        connManager().reconfigure(linkConfig(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertNotNull(localAdmin(), "Local admin client was not recreated");
        Assertions.assertNotNull(remoteAdmin(), "Remote admin client was not recreated");
    }

    @Test
    public void testLogLevel() {
        Class<?> cls = ClusterLinkDestConnectionManager.class;
        Level classLoggerLevel = LogCaptureAppender$.MODULE$.setClassLoggerLevel(ClusterLinkDestConnectionManager.class, Level.ALL);
        LogCaptureAppender createAndRegister = LogCaptureAppender$.MODULE$.createAndRegister();
        try {
            verifyLogLevel(ClusterLinkDestConnectionManager.class, createAndRegister, None$.MODULE$);
            verifyLogLevel(ClusterLinkDestConnectionManager.class, createAndRegister, new Some((Object) null));
            ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps(org.slf4j.event.Level.values()), level -> {
                $anonfun$testLogLevel$1(this, cls, createAndRegister, level);
                return BoxedUnit.UNIT;
            });
        } finally {
            LogCaptureAppender$.MODULE$.unregister(createAndRegister);
            LogCaptureAppender$.MODULE$.setClassLoggerLevel(ClusterLinkDestConnectionManager.class, classLoggerLevel);
            createAndRegister.close();
        }
    }

    private void verifyLogLevel(Class<?> cls, LogCaptureAppender logCaptureAppender, Option<org.slf4j.event.Level> option) {
        LogContext logContext;
        org.slf4j.event.Level level;
        AtomicReference atomicReference = new AtomicReference();
        if (option instanceof Some) {
            logContext = new LogContext("[TEST] ", atomicReference);
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            logContext = new LogContext("[TEST] ");
        }
        Logger logger = logContext.logger(cls);
        boolean z = false;
        Some some = null;
        if (option instanceof Some) {
            z = true;
            some = (Some) option;
            if (((org.slf4j.event.Level) some.value()) == null) {
                level = org.slf4j.event.Level.ERROR;
                org.slf4j.event.Level level2 = level;
                option.foreach(level3 -> {
                    atomicReference.set(level3);
                    return BoxedUnit.UNIT;
                });
                logger.trace("trace message");
                assertLastLog$1(org.slf4j.event.Level.TRACE, "[TEST] trace message", logCaptureAppender, level2);
                logger.debug("debug message");
                assertLastLog$1(org.slf4j.event.Level.DEBUG, "[TEST] debug message", logCaptureAppender, level2);
                logger.info("info message");
                assertLastLog$1(org.slf4j.event.Level.INFO, "[TEST] info message", logCaptureAppender, level2);
                logger.warn("warn message");
                assertLastLog$1(org.slf4j.event.Level.WARN, "[TEST] warn message", logCaptureAppender, level2);
                logger.error("error message");
                assertLastLog$1(org.slf4j.event.Level.ERROR, "[TEST] error message", logCaptureAppender, level2);
            }
        }
        if (z) {
            level = (org.slf4j.event.Level) some.value();
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            level = org.slf4j.event.Level.ERROR;
        }
        org.slf4j.event.Level level22 = level;
        option.foreach(level32 -> {
            atomicReference.set(level32);
            return BoxedUnit.UNIT;
        });
        logger.trace("trace message");
        assertLastLog$1(org.slf4j.event.Level.TRACE, "[TEST] trace message", logCaptureAppender, level22);
        logger.debug("debug message");
        assertLastLog$1(org.slf4j.event.Level.DEBUG, "[TEST] debug message", logCaptureAppender, level22);
        logger.info("info message");
        assertLastLog$1(org.slf4j.event.Level.INFO, "[TEST] info message", logCaptureAppender, level22);
        logger.warn("warn message");
        assertLastLog$1(org.slf4j.event.Level.WARN, "[TEST] warn message", logCaptureAppender, level22);
        logger.error("error message");
        assertLastLog$1(org.slf4j.event.Level.ERROR, "[TEST] error message", logCaptureAppender, level22);
    }

    private KafkaConfig createBrokerConfig() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    private void setupConnectionManager(ConnectionMode connectionMode) {
        linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        if (ConnectionMode$Outbound$.MODULE$.equals(connectionMode)) {
            linkProps().put("bootstrap.servers", "localhost:123");
        }
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, none$, true));
        linkMetrics_$eq(new ClusterLinkMetrics(linkName(), linkId(), LinkMode$Destination$.MODULE$, ConnectionMode$Outbound$.MODULE$, false, linkManager(), None$.MODULE$, metrics(), None$.MODULE$));
        linkMetrics().startup();
        connManager_$eq(new ClusterLinkDestConnectionManager(linkData(), linkConfig(), destClusterId(), None$.MODULE$, linkMetrics(), (clusterLinkConfig, clusterLinkDestConnectionManager) -> {
            return this.createRemoteAdmin(clusterLinkConfig, clusterLinkDestConnectionManager);
        }, str -> {
            return this.createLocalAdmin();
        }, metadataManager(), brokerConfig(), new MockTime()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConfluentAdmin createLocalAdmin() {
        kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(new MockAdminClient(this) { // from class: kafka.server.link.ClusterLinkDestConnectionManagerTest$$anon$1
            private final /* synthetic */ ClusterLinkDestConnectionManagerTest $outer;

            public void close(Duration duration) {
                super.close(duration);
                this.$outer.kafka$server$link$ClusterLinkDestConnectionManagerTest$$localAdmin_$eq(null);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        return localAdmin();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterLinkAdminClient createRemoteAdmin(ClusterLinkConfig clusterLinkConfig, ClusterLinkDestConnectionManager clusterLinkDestConnectionManager) {
        Assertions.assertEquals(linkConfig().originals(), clusterLinkConfig.originals());
        Assertions.assertSame(connManager(), clusterLinkDestConnectionManager);
        remoteAdmin_$eq((ClusterLinkAdminClient) Mockito.mock(ClusterLinkAdminClient.class));
        ((ClusterLinkAdminClient) Mockito.doAnswer(invocationOnMock -> {
            $anonfun$createRemoteAdmin$1(this, invocationOnMock);
            return BoxedUnit.UNIT;
        }).when(remoteAdmin())).close();
        Mockito.when(remoteAdmin().networkClient()).thenReturn(networkClient());
        Mockito.when(remoteAdmin().metadataManager()).thenReturn(adminMetadataManager());
        return remoteAdmin();
    }

    private ReverseNode reverseNode(int i, int i2) {
        return new ReverseNode(i, i, new StringBuilder(4).append("host").append(i).toString(), 1234, linkId(), i2, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public static final /* synthetic */ void $anonfun$testLogLevel$1(ClusterLinkDestConnectionManagerTest clusterLinkDestConnectionManagerTest, Class cls, LogCaptureAppender logCaptureAppender, org.slf4j.event.Level level) {
        clusterLinkDestConnectionManagerTest.verifyLogLevel(cls, logCaptureAppender, new Some(level));
    }

    private static final void assertLastLog$1(org.slf4j.event.Level level, String str, LogCaptureAppender logCaptureAppender, org.slf4j.event.Level level2) {
        LoggingEvent loggingEvent = (LoggingEvent) logCaptureAppender.getMessages().last();
        Assertions.assertEquals((level2.toInt() < level.toInt() ? level2 : level).name(), loggingEvent.getLevel().toString());
        Assertions.assertEquals(str, loggingEvent.getMessage());
    }

    public static final /* synthetic */ void $anonfun$createRemoteAdmin$1(ClusterLinkDestConnectionManagerTest clusterLinkDestConnectionManagerTest, InvocationOnMock invocationOnMock) {
        clusterLinkDestConnectionManagerTest.closedClients().$plus$eq(clusterLinkDestConnectionManagerTest.remoteAdmin());
        clusterLinkDestConnectionManagerTest.remoteAdmin_$eq(null);
    }
}
