package kafka.server.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

/* compiled from: ClusterLinkInboundConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0005\t}x!\u0002\u001b6\u0011\u0003ad!\u0002 6\u0011\u0003y\u0004\"\u0002$\u0002\t\u00039\u0005b\u0002%\u0002\u0005\u0004%\t!\u0013\u0005\u0007-\u0006\u0001\u000b\u0011\u0002&\u0007\ty*\u0004a\u0016\u0005\nE\u0016\u0011\t\u0011)A\u0005G&D\u0001B[\u0003\u0003\u0002\u0003\u0006Ia\u001b\u0005\n]\u0016\u0011\t\u0011)A\u0005_jD\u0001b_\u0003\u0003\u0002\u0003\u0006I\u0001 \u0005\u000b\u0003+)!\u0011!Q\u0001\n\u0005]\u0001BCA\u000f\u000b\t\u0005\t\u0015!\u0003\u0002 !Q\u0011QF\u0003\u0003\u0002\u0003\u0006I!a\f\t\u0015\u0005\u0005SA!A!\u0002\u0013\t\u0019\u0005\u0003\u0006\u0002J\u0015\u0011\t\u0011)A\u0005\u0003\u0017B!\"a\u0015\u0006\u0005\u0003\u0005\u000b\u0011BA+\u0011\u00191U\u0001\"\u0001\u0002f!I\u00111P\u0003C\u0002\u0013%\u0011Q\u0010\u0005\t\u0003'+\u0001\u0015!\u0003\u0002��!A\u0011QS\u0003C\u0002\u0013%\u0011\nC\u0004\u0002\u0018\u0016\u0001\u000b\u0011\u0002&\t\u0011\u0005eUA1A\u0005\n%Cq!a'\u0006A\u0003%!\n\u0003\u0005\u0002\u001e\u0016\u0011\r\u0011\"\u0003J\u0011\u001d\ty*\u0002Q\u0001\n)C\u0011\"!)\u0006\u0001\u0004%I!a)\t\u0013\u0005\u001dV\u00011A\u0005\n\u0005%\u0006\u0002CA[\u000b\u0001\u0006K!!*\t\u0013\u0005}VA1A\u0005\n\u0005\u0005\u0007\u0002CAh\u000b\u0001\u0006I!a1\t\u0013\u0005EWA1A\u0005\n\u0005M\u0007\u0002CAq\u000b\u0001\u0006I!!6\t\u000f\u0005\rX\u0001\"\u0011\u0002f\"9\u0011q]\u0003\u0005B\u0005%\b\u0002\u0003B\u0003\u000b\u0011\u0005QGa\u0002\t\u000f\t5R\u0001\"\u0011\u00030!9!QI\u0003\u0005B\t\u001d\u0003b\u0002B>\u000b\u0011%!Q\u0010\u0005\b\u0005/+A\u0011\u0002BM\u0011\u001d\u0011y+\u0002C!\u0005cCqA!0\u0006\t\u0003\n)\u000fC\u0004\u0003@\u0016!I!!:\t\u000f\t\u0005W\u0001\"\u0015\u0002f\"9!1Y\u0003\u0005R\u0005\u0015\bb\u0002Bc\u000b\u0011%!q\u0019\u0005\b\u0005\u0013,A\u0011BAs\u0011\u001d\u0011Y-\u0002C\u0001\u0005\u001bDqA!5\u0006\t\u0003\u0012\u0019\u000eC\u0004\u0003V\u0016!\tEa5\t\u000f\t]W\u0001\"\u0011\u0003Z\"i!q_\u0003\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003zjDQBa?\u0006!\u0003\r\t\u0011!C\u0005\u0005{L\u0017aI\"mkN$XM\u001d'j].LeNY8v]\u0012\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0006\u0003m]\nA\u0001\\5oW*\u0011\u0001(O\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003i\nQa[1gW\u0006\u001c\u0001\u0001\u0005\u0002>\u00035\tQGA\u0012DYV\u001cH/\u001a:MS:\\\u0017J\u001c2pk:$7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0014\u0005\u0005\u0001\u0005CA!E\u001b\u0005\u0011%\"A\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0013%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002y\u0005!b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012,\u0012A\u0013\t\u0003\u0017Rk\u0011\u0001\u0014\u0006\u0003\u001b:\u000ba!\u0019;p[&\u001c'BA(Q\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003#J\u000bA!\u001e;jY*\t1+\u0001\u0003kCZ\f\u0017BA+M\u00055\tEo\\7jG&sG/Z4fe\u0006)b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u00043cA\u0003Y7B\u0011Q(W\u0005\u00035V\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\u0005\u0002]?:\u0011Q(X\u0005\u0003=V\n!c\u00117vgR,'\u000fT5oW\u001a\u000b7\r^8ss&\u0011\u0001-\u0019\u0002\u0019\u0013:\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(B\u000106\u0003!a\u0017N\\6ECR\f\u0007C\u00013h\u001b\u0005)'B\u00014:\u0003\tQ8.\u0003\u0002iK\ny1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-\u0003\u0002c3\u0006i\u0011N\\5uS\u0006d7i\u001c8gS\u001e\u0004\"!\u00107\n\u00055,$!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u0019Bn\\2bY2{w-[2bY\u000ecWo\u001d;feB\u0011\u0001o\u001e\b\u0003cV\u0004\"A\u001d\"\u000e\u0003MT!\u0001^\u001e\u0002\rq\u0012xn\u001c;?\u0013\t1()\u0001\u0004Qe\u0016$WMZ\u0005\u0003qf\u0014aa\u0015;sS:<'B\u0001<C\u0013\tq\u0017,A\tdY&,g\u000e^%oi\u0016\u00148-\u001a9u_J\u00042!Q?��\u0013\tq(I\u0001\u0004PaRLwN\u001c\t\u0005\u0003\u0003\t\t\"\u0004\u0002\u0002\u0004)!\u0011QAA\u0004\u0003\u001d\u0019G.[3oiNT1AOA\u0005\u0015\u0011\tY!!\u0004\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\ty!A\u0002pe\u001eLA!a\u0005\u0002\u0004\t\t2\t\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0002\u000f5,GO]5dgB\u0019Q(!\u0007\n\u0007\u0005mQG\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0005:f[>$X-\u00113nS:4\u0015m\u0019;pef\u0004\u0002\"QA\u0011W\u0006\u0015\u0012qE\u0005\u0004\u0003G\u0011%!\u0003$v]\u000e$\u0018n\u001c83!\tiT\u0001E\u0002>\u0003SI1!a\u000b6\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018!\u00067pG\u0006d7i\u001c8o\u0003\u0012l\u0017N\u001c$bGR|'/\u001f\t\u0007\u0003\u0006Er.!\u000e\n\u0007\u0005M\"IA\u0005Gk:\u001cG/[8ocA!\u0011qGA\u001f\u001b\t\tID\u0003\u0003\u0002<\u0005\r\u0011!B1e[&t\u0017\u0002BA \u0003s\u0011\u0001cS1gW\u0006\fE-\\5o\u00072LWM\u001c;\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ\u00042!PA#\u0013\r\t9%\u000e\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\rEJ|7.\u001a:D_:4\u0017n\u001a\t\u0005\u0003\u001b\ny%D\u00018\u0013\r\t\tf\u000e\u0002\f\u0017\u000647.Y\"p]\u001aLw-\u0001\u0003uS6,\u0007\u0003BA,\u0003Cj!!!\u0017\u000b\t\u0005m\u0013QL\u0001\u0006kRLGn\u001d\u0006\u0005\u0003?\n9!\u0001\u0004d_6lwN\\\u0005\u0005\u0003G\nIF\u0001\u0003US6,GCFA\u0013\u0003O\nI'a\u001b\u0002n\u0005=\u0014\u0011OA:\u0003k\n9(!\u001f\t\u000b\t\u0004\u0002\u0019A2\t\u000b)\u0004\u0002\u0019A6\t\u000b9\u0004\u0002\u0019A8\t\u000bm\u0004\u0002\u0019\u0001?\t\u000f\u0005U\u0001\u00031\u0001\u0002\u0018!9\u0011Q\u0004\tA\u0002\u0005}\u0001bBA\u0017!\u0001\u0007\u0011q\u0006\u0005\b\u0003\u0003\u0002\u0002\u0019AA\"\u0011\u001d\tI\u0005\u0005a\u0001\u0003\u0017Bq!a\u0015\u0011\u0001\u0004\t)&\u0001\nd_:tWm\u0019;j_:\u0014V-];fgR\u001cXCAA@!!\t\t)a!\u0002\b\u00065U\"\u0001(\n\u0007\u0005\u0015eJA\tD_:\u001cWO\u001d:f]RD\u0015m\u001d5NCB\u00042!QAE\u0013\r\tYI\u0011\u0002\u0004\u0013:$\bcA\u001f\u0002\u0010&\u0019\u0011\u0011S\u001b\u0003\u001bI+g/\u001a:tK\u000ec\u0017.\u001a8u\u0003M\u0019wN\u001c8fGRLwN\u001c*fcV,7\u000f^:!\u0003QqW\r\u001f;SKZ,'o]3SKF,Xm\u001d;JI\u0006)b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u0004\u0013!\u00069feNL7\u000f^3oi\u000e{gN\\3di&|gn]\u0001\u0017a\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8tA\u0005A\u0012m\u0019;jm\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\:\u00023\u0005\u001cG/\u001b<f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000fI\u0001\u0017e\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]V\u0011\u0011Q\u0015\t\u0005\u0003v\fi)\u0001\u000esKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o?\u0012*\u0017\u000f\u0006\u0003\u0002,\u0006E\u0006cA!\u0002.&\u0019\u0011q\u0016\"\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003gS\u0012\u0011!a\u0001\u0003K\u000b1\u0001\u001f\u00132\u0003]\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0007\u0005K\u0002\u001c\u0003s\u00032!QA^\u0013\r\tiL\u0011\u0002\tm>d\u0017\r^5mK\u0006qAn\\2bY\u000e{gN\\!e[&tWCAAb!\u0015i\u0014QYAe\u0013\r\t9-\u000e\u0002\r\u0019\u0006T\u0018PU3t_V\u00148-\u001a\t\u0005\u0003o\tY-\u0003\u0003\u0002N\u0006e\"AD\"p]\u001adW/\u001a8u\u0003\u0012l\u0017N\\\u0001\u0010Y>\u001c\u0017\r\\\"p]:\fE-\\5oA\u0005q\u0002/\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u001cv.\u001e:dK2{7m[\u000b\u0003\u0003+\u0004B!a6\u0002^6\u0011\u0011\u0011\u001c\u0006\u0004\u00037\u0014\u0016\u0001\u00027b]\u001eLA!a8\u0002Z\n1qJ\u00196fGR\fq\u0004]3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t7k\\;sG\u0016dunY6!\u0003\u001d\u0019H/\u0019:ukB$\"!a+\u0002#\u0015t\u0017M\u00197f\u00072,8\u000f^3s\u0019&t7\u000e\u0006\u0004\u0002,\u0006-\u0018Q\u001f\u0005\b\u0003[\f\u0003\u0019AAx\u00035qW\r^<pe.\u001cE.[3oiB\u0019Q(!=\n\u0007\u0005MXG\u0001\rDYV\u001cH/\u001a:MS:\\g*\u001a;x_J\\7\t\\5f]RDq!!\u0011\"\u0001\u0004\t9\u0010\u0005\u0003B{\u0006e\b\u0003BA~\u0005\u0003i!!!@\u000b\t\u0005}\u0018\u0011H\u0001\nS:$XM\u001d8bYNLAAa\u0001\u0002~\n!\u0012\tZ7j]6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0011D]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o!J|g/\u001b3feRA!\u0011\u0002B\u0010\u0005O\u0011I\u0003\u0005\u0003B{\n-\u0001\u0003\u0002B\u0007\u00053qAAa\u0004\u0003\u00165\u0011!\u0011\u0003\u0006\u0005\u0005'\ti&A\u0004oKR<xN]6\n\t\t]!\u0011C\u0001\f%\u00164XM]:f\u001d>$W-\u0003\u0003\u0003\u001c\tu!AE\"p]:,7\r^5p]B\u0013xN^5eKJTAAa\u0006\u0003\u0012!9\u0011Q\u001e\u0012A\u0002\t\u0005\u0002\u0003BA\u0001\u0005GIAA!\n\u0002\u0004\tia*\u001a;x_J\\7\t\\5f]RDq!!\u0011#\u0001\u0004\t9\u0010\u0003\u0004\u0003,\t\u0002\ra\\\u0001\tG2LWM\u001c;JI\u0006A\u0002O]8dKN\u001c(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8\u0015\r\u0005-&\u0011\u0007B\u001e\u0011\u001d\u0011\u0019d\ta\u0001\u0005k\tqa\u00195b]:,G\u000e\u0005\u0003\u0003\u0010\t]\u0012\u0002\u0002B\u001d\u0005#\u0011AbS1gW\u0006\u001c\u0005.\u00198oK2DqA!\u0010$\u0001\u0004\u0011y$A\u0006sKZ,'o]3O_\u0012,\u0007\u0003\u0002B\b\u0005\u0003JAAa\u0011\u0003\u0012\tY!+\u001a<feN,gj\u001c3f\u0003iIg.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)\u0019\u0011IE!\u0019\u0003rA1!1\nB)\u0005+j!A!\u0014\u000b\u0007\t=#)\u0001\u0006d_2dWm\u0019;j_:LAAa\u0015\u0003N\t\u00191+Z9\u0011\r\u0005\u0005%q\u000bB.\u0013\r\u0011IF\u0014\u0002\u0012\u0007>l\u0007\u000f\\3uC\ndWMR;ukJ,\u0007\u0003BAl\u0005;JAAa\u0018\u0002Z\n!ak\\5e\u0011\u001d\u0011\u0019\u0007\na\u0001\u0005K\n\u0011$\u001b8ji&\fG/Z\"p]:,7\r^5p]J+\u0017/^3tiB!!q\rB7\u001b\t\u0011IG\u0003\u0003\u0003l\u0005u\u0013\u0001\u0003:fcV,7\u000f^:\n\t\t=$\u0011\u000e\u0002\"\u0013:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d*fcV,7\u000f\u001e\u0005\b\u0005g\"\u0003\u0019\u0001B;\u00039\u0011X-];fgR\u001cuN\u001c;fqR\u0004BAa\u001a\u0003x%!!\u0011\u0010B5\u00059\u0011V-];fgR\u001cuN\u001c;fqR\f\u0001D]3rk\u0016\u001cHOU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o))\tYKa \u0003\u0004\n\u001d%1\u0013\u0005\b\u0005\u0003+\u0003\u0019AAD\u0003%\u0011X-];fgRLE\rC\u0004\u0003\u0006\u0016\u0002\r!!$\u0002\r\rd\u0017.\u001a8u\u0011\u001d\u0011I)\na\u0001\u0005\u0017\u000b!b]8ve\u000e,gj\u001c3f!\u0011\u0011iIa$\u000e\u0005\u0005u\u0013\u0002\u0002BI\u0003;\u0012AAT8eK\"9!QS\u0013A\u0002\u0005\u001d\u0015\u0001\u00043fgR\u0014%o\\6fe&#\u0017A\u00074pe^\f'\u000f\u001a+p%\u0016lw\u000e^3D_>\u0014H-\u001b8bi>\u0014HCBAV\u00057\u0013Y\u000bC\u0004\u0003\u001e\u001a\u0002\rAa(\u0002\u0017I,\u0017/^3ti\u0012\u000bG/\u0019\t\u0005\u0005C\u00139+\u0004\u0002\u0003$*!!QUA/\u0003\u001diWm]:bO\u0016LAA!+\u0003$\n)\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\u0012\u000bG/\u0019\u0005\b\u0005[3\u0003\u0019\u0001B%\u0003\u001d1W\u000f^;sKN\f!c\u001c8D_:$(o\u001c7mKJ\u001c\u0005.\u00198hKR!\u00111\u0016BZ\u0011\u001d\u0011)l\na\u0001\u0005o\u000b!#[:BGRLg/Z\"p]R\u0014x\u000e\u001c7feB\u0019\u0011I!/\n\u0007\tm&IA\u0004C_>dW-\u00198\u0002G=tG*\u001b8l\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>tG*Z1eKJ\u001c\u0005.\u00198hK\u0006iR.Y=cKB\u0013xnY3tg\u000e{wN\u001d3j]\u0006$xN]\"iC:<W-A\u000edY>\u001cXMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u001dGJ,\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003Q\u0019'/Z1uK2{7-\u00197D_:t\u0017\tZ7j]R\u0011\u0011\u0011Z\u0001\u0017[\u0006L(-Z\"sK\u0006$XMU3n_R,\u0017\tZ7j]\u00069\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8DY&,g\u000e^\u000b\u0003\u0005\u001f\u0004B!Q?\u0003\"\u0005I\u0002/\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u001cu.\u001e8u+\t\t9)\u0001\fsKZ,'o]3D_:tWm\u0019;j_:\u001cu.\u001e8u\u00035a\u0017M_=SKN|WO]2fgV\u0011!1\u001c\t\u0007\u0005\u0017\u0012\tF!81\t\t}'Q\u001d\t\u0006{\u0005\u0015'\u0011\u001d\t\u0005\u0005G\u0014)\u000f\u0004\u0001\u0005\u0017\t\u001d\u0018'!A\u0001\u0002\u000b\u0005!\u0011\u001e\u0002\u0004?\u0012\n\u0014\u0003\u0002Bv\u0005c\u00042!\u0011Bw\u0013\r\u0011yO\u0011\u0002\b\u001d>$\b.\u001b8h!\r\t%1_\u0005\u0004\u0005k\u0014%aA!os\u0006I2/\u001e9fe\u0012bwnY1m\u0019><\u0017nY1m\u00072,8\u000f^3s+\u0005y\u0017AD:va\u0016\u0014H\u0005\\5oW\u0012\u000bG/Y\u000b\u0002G\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkInboundConnectionManager.class */
public class ClusterLinkInboundConnectionManager extends ClusterLinkConnectionManager implements ClusterLinkFactory.InboundConnectionManager {
    private final ClusterLinkConfig initialConfig;
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Function1<String, KafkaAdminClient> localConnAdminFactory;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;
    private final LazyResource<ConfluentAdmin> localConnAdmin;
    private final Object persistentConnectionSourceLock;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    private /* synthetic */ ClusterLinkData super$linkData() {
        return super.linkData();
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> option) {
        this.reverseConnectionAdmin = option;
    }

    private LazyResource<ConfluentAdmin> localConnAdmin() {
        return this.localConnAdmin;
    }

    private Object persistentConnectionSourceLock() {
        return this.persistentConnectionSourceLock;
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void startup() {
        ConnectionMode connectionMode = this.initialConfig.connectionMode();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Outbound$)) {
            throw new IllegalStateException("Inbound connection manager created in outbound connection mode");
        }
        super.startup();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void enableClusterLink(ClusterLinkNetworkClient clusterLinkNetworkClient, Option<AdminMetadataManager> option) {
        KafkaClient networkClient = clusterLinkNetworkClient.networkClient();
        if (networkClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient) networkClient;
            networkClient2.enableClusterLinkRequests(super.linkData().linkId(), (ClientInterceptor) this.clientInterceptor.orNull($less$colon$less$.MODULE$.refl()), (ReverseNode.ConnectionProvider) reverseConnectionProvider(networkClient2, option, clusterLinkNetworkClient.clientId()).orNull($less$colon$less$.MODULE$.refl()));
            return;
        }
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> option, String str) {
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null || !connectionMode.equals(connectionMode$Inbound$)) {
            return None$.MODULE$;
        }
        ReverseClient$ reverseClient$ = ReverseClient$.MODULE$;
        ReverseClient reverseClient = new ReverseClient(networkClient, option, None$.MODULE$, str);
        return new Some(node -> {
            this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node, this.brokerConfig.brokerId());
        });
    }

    /* JADX WARN: Type inference failed for: r0v36, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.InboundConnectionManager
    public void processReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
        Option<ReverseClient> apply;
        boolean z;
        debug(() -> {
            return new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(kafkaChannel).append(" ").append(reverseNode).toString();
        });
        ensureReverseConnectionsEnabled();
        if (reverseNode.requestId().isPresent()) {
            apply = Option$.MODULE$.apply(connectionRequests().remove(reverseNode.requestId().get()));
            z = false;
        } else {
            maybeCreateRemoteAdmin();
            apply = reverseConnectionAdmin();
            z = true;
        }
        boolean z2 = z;
        Option<ReverseClient> option = apply;
        if (!(option instanceof Some)) {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            throw new NetworkException("Reverse connection is no longer required");
        }
        ReverseClient reverseClient = (ReverseClient) ((Some) option).value();
        Consumer consumer = kafkaChannel2 -> {
            this.debug(() -> {
                return new StringBuilder(38).append("Reverse channel ").append(kafkaChannel2).append(" has been disconnected").toString();
            });
            this.metrics.reverseConnectionClosedSensor().record();
            this.metrics.deprecatedReverseConnectionClosedSensor().record();
            this.activeReverseConnections().decrementAndGet();
            if (z2) {
                ?? persistentConnectionSourceLock = this.persistentConnectionSourceLock();
                synchronized (persistentConnectionSourceLock) {
                    if (this.persistentConnections().decrementAndGet() <= 0) {
                        reverseClient.persistentConnectionSource_$eq(None$.MODULE$);
                        if (this.isLinkCoordinator()) {
                            persistentConnectionSourceLock = this;
                            persistentConnectionSourceLock.info(() -> {
                                return "Persistent connection to source link coordinator was disconnected, awaiting new connection.";
                            });
                        }
                    }
                }
            }
        };
        activeReverseConnections().incrementAndGet();
        if (z2) {
            ?? persistentConnectionSourceLock = persistentConnectionSourceLock();
            synchronized (persistentConnectionSourceLock) {
                persistentConnections().incrementAndGet();
                reverseClient.persistentConnectionSource_$eq(new Some(Predef$.MODULE$.int2Integer(reverseNode.remoteBrokerId())));
            }
        }
        this.metrics.reverseConnectionCreatedSensor().record();
        this.metrics.deprecatedReverseConnectionCreatedSensor().record();
        ReverseChannel reverseChannel = new ReverseChannel(kafkaChannel, reverseNode, consumer);
        reverseClient.networkClient().reverseAndAdd(reverseChannel);
        reverseClient.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
        info(() -> {
            return new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString();
        });
    }

    /* JADX WARN: Code restructure failed: missing block: B:15:0x004c, code lost:
    
        if (r0.equals(r1) != false) goto L10;
     */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    /* renamed from: initiateReverseConnections */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> mo1393initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest r7, org.apache.kafka.common.requests.RequestContext r8) {
        /*
            r6 = this;
            r0 = r6
            r1 = r6
            r2 = r7
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$1(r1, r2);
            }
            r0.debug(r1)
            r0 = r6
            r0.ensureReverseConnectionsEnabled()
            r0 = r7
            org.apache.kafka.common.message.InitiateReverseConnectionsRequestData r0 = r0.data()
            r9 = r0
            scala.package$ r0 = scala.package$.MODULE$
            scala.collection.immutable.List$ r0 = r0.List()
            r1 = r9
            java.util.List r1 = r1.entries()
            int r1 = r1.size()
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$2();
            }
            scala.collection.SeqOps r0 = r0.fill(r1, r2)
            scala.collection.immutable.List r0 = (scala.collection.immutable.List) r0
            r10 = r0
            r0 = r6
            java.lang.String r0 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L81
            r1 = r9
            java.lang.String r1 = r1.sourceClusterId()     // Catch: java.lang.Throwable -> L81
            r11 = r1
            r1 = r0
            if (r1 != 0) goto L47
        L3f:
            r0 = r11
            if (r0 == 0) goto L4f
            goto L77
        L47:
            r1 = r11
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L77
        L4f:
            org.apache.kafka.common.errors.InvalidRequestException r0 = new org.apache.kafka.common.errors.InvalidRequestException     // Catch: java.lang.Throwable -> L81
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder     // Catch: java.lang.Throwable -> L81
            r3 = r2
            r4 = 70
            r3.<init>(r4)     // Catch: java.lang.Throwable -> L81
            java.lang.String r3 = "Cannot initiate reverse connection from destination cluster "
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            r3 = r6
            java.lang.String r3 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L81
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            java.lang.String r3 = " to itself"
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Throwable -> L81
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L81
            throw r0     // Catch: java.lang.Throwable -> L81
        L77:
            r0 = r6
            r1 = r9
            r2 = r10
            r0.forwardToRemoteCoordinator(r1, r2)     // Catch: java.lang.Throwable -> L81
            goto L9f
        L81:
            r12 = move-exception
            r0 = r6
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$3();
            }
            r2 = r12
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$4(r2);
            }
            r0.error(r1, r2)
            r0 = r10
            r1 = r12
            scala.collection.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = (v1) -> { // scala.Function1.apply(java.lang.Object):java.lang.Object
                return $anonfun$initiateReverseConnections$5$adapted(r1, v1);
            }
            r0.foreach(r1)
        L9f:
            r0 = r10
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkInboundConnectionManager.mo1393initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest, org.apache.kafka.common.requests.RequestContext):scala.collection.Seq");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestReverseConnection(int i, ReverseClient reverseClient, Node node, int i2) {
        debug(() -> {
            return new StringBuilder(90).append("Requesting reverse connection for dest broker ").append(i2).append(" with requestId ").append(i).append(" to source node ").append(node).append(" for client ").append(reverseClient.clientId()).toString();
        });
        ensureReverseConnectionsEnabled();
        if (reverseConnectionAdmin().exists(reverseClient2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$requestReverseConnection$2(reverseClient, reverseClient2));
        })) {
            throw new NetworkException(new StringBuilder(79).append("Waiting for persistent connection to request reverse connection for request id ").append(i).toString());
        }
        InitiateReverseConnectionsRequestData entries = new InitiateReverseConnectionsRequestData().setClusterLinkId(linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String) super.linkData().clusterId().orNull($less$colon$less$.MODULE$.refl())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(i).setSourceBrokerId(node.id()).setTargetBrokerId(i2)));
        connectionRequests().put(BoxesRunTime.boxToInteger(i), reverseClient);
        try {
            if (!isLinkCoordinator() && persistentConnectionCount() <= 0) {
                ((KafkaFutureImpl) ConfluentAdminUtils.initiateReverseConnections(localConnAdmin().getOrCreate(true), entries, Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt(linkCoordinatorId().getOrElse(() -> {
                    throw new CoordinatorNotAvailableException(new StringBuilder(43).append("Cluster link coordinator not available for ").append(this.super$linkData().linkName()).toString());
                })))).get(BoxesRunTime.boxToInteger(i))).whenComplete((r10, th) -> {
                    this.onCompletion$1(th, i, reverseClient, node);
                });
            } else {
                CompletableFuture completableFuture = new CompletableFuture();
                forwardToRemoteCoordinator(entries, new $colon.colon(completableFuture, Nil$.MODULE$));
                completableFuture.whenComplete((r102, th2) -> {
                    this.onCompletion$1(th2, i, reverseClient, node);
                });
            }
        } catch (Throwable th3) {
            connectionRequests().remove(BoxesRunTime.boxToInteger(i));
            throw th3;
        }
    }

    private void forwardToRemoteCoordinator(InitiateReverseConnectionsRequestData initiateReverseConnectionsRequestData, Seq<CompletableFuture<Void>> seq) {
        boolean isLinkCoordinator = isLinkCoordinator();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient().map(clusterLinkAdminClient -> {
                return clusterLinkAdminClient.admin();
            });
        }).getOrElse(() -> {
            if (isLinkCoordinator) {
                throw new NetworkException("Request cannot be forwarded to remote link coordinator at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote link coordinator since this broker is not the local link coordinator.");
        });
        Integer num = (Integer) reverseConnectionAdmin().flatMap(reverseClient2 -> {
            return reverseClient2.persistentConnectionSource();
        }).getOrElse(() -> {
            throw new NetworkException("Request cannot be forwarded to remote link coordinator because persistent connection is not yet available");
        });
        debug(() -> {
            return new StringBuilder(91).append("Forward initiate reverse connection request to remote link coordinator: ").append(initiateReverseConnectionsRequestData).append(" remoteCoordinator=").append(num).toString();
        });
        Map initiateReverseConnections = ConfluentAdminUtils.initiateReverseConnections(confluentAdmin, initiateReverseConnectionsRequestData, num);
        ((IterableOnceOps) CollectionConverters$.MODULE$.ListHasAsScala(initiateReverseConnectionsRequestData.entries()).asScala().zip(seq)).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            InitiateReverseConnectionsRequestData.EntryData entryData = (InitiateReverseConnectionsRequestData.EntryData) tuple2._1();
            CompletableFuture completableFuture = (CompletableFuture) tuple2._2();
            return ((KafkaFutureImpl) initiateReverseConnections.get(BoxesRunTime.boxToInteger(entryData.initiateRequestId()))).whenComplete((r7, th) -> {
                if (th != null) {
                    this.warn(() -> {
                        return new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entryData.initiateRequestId()).toString();
                    }, () -> {
                        return th;
                    });
                    completableFuture.completeExceptionally(th);
                } else {
                    this.debug(() -> {
                        return new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entryData.initiateRequestId()).toString();
                    });
                    completableFuture.complete(r7);
                }
            });
        });
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onControllerChange(boolean z) {
        maybeProcessCoordinatorChange();
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onLinkMetadataPartitionLeaderChange() {
        maybeProcessCoordinatorChange();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    private void maybeProcessCoordinatorChange() {
        synchronized (stateChangeLock()) {
            boolean isLinkCoordinator = isLinkCoordinator();
            debug(() -> {
                return new StringBuilder(69).append("Process controller or metadata partition leader change isCoordinator=").append(isLinkCoordinator).toString();
            });
            if (reverseConnectionAdmin().isEmpty()) {
                if (isLinkCoordinator) {
                    resetReverseConnectionAdmin();
                }
            } else if (!isLinkCoordinator) {
                closeReverseConnectionAdmin();
            }
        }
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void closeReverseConnectionAdmin() {
        debug(() -> {
            return "Closing reverse connection admin";
        });
        reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient();
        }).foreach(clusterLinkAdminClient -> {
            $anonfun$closeReverseConnectionAdmin$3(this, clusterLinkAdminClient);
            return BoxedUnit.UNIT;
        });
        reverseConnectionAdmin_$eq(None$.MODULE$);
        if (isActive()) {
            return;
        }
        localConnAdmin().closeResource();
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void createReverseConnectionAdmin() {
        debug(() -> {
            return "Recreate admin client used to initiate connection reversal requests";
        });
        maybeCreateRemoteAdmin();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConfluentAdmin createLocalConnAdmin() {
        ensureReverseConnectionsEnabled();
        return (ConfluentAdmin) this.localConnAdminFactory.apply(super.linkData().linkName());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v13, types: [kafka.server.link.ClusterLinkInboundConnectionManager] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    private void maybeCreateRemoteAdmin() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            if (reverseConnectionAdmin().isEmpty() && isLinkCoordinator()) {
                ClusterLinkAdminClient clusterLinkAdminClient = (ClusterLinkAdminClient) this.remoteAdminFactory.apply(currentConfig(), this);
                stateChangeLock = this;
                stateChangeLock.reverseConnectionAdmin_$eq(new Some(new ReverseClient(clusterLinkAdminClient.networkClient(), new Some(clusterLinkAdminClient.metadataManager()), new Some(clusterLinkAdminClient), clusterLinkAdminClient.clientId())));
            }
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return reverseConnectionAdmin().map(reverseClient -> {
            return reverseClient.networkClient();
        });
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int persistentConnectionCount() {
        if (isLinkCoordinator()) {
            return persistentConnections().get();
        }
        return 0;
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int reverseConnectionCount() {
        return activeReverseConnections().get();
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    /* renamed from: lazyResources */
    public Seq<LazyResource<?>> mo1392lazyResources() {
        return new $colon.colon(localConnAdmin(), Nil$.MODULE$);
    }

    public static final /* synthetic */ void $anonfun$localConnAdmin$2(ConfluentAdmin confluentAdmin) {
        confluentAdmin.close(Duration.ZERO);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnections$5(Throwable th, CompletableFuture completableFuture) {
        return completableFuture.completeExceptionally(th);
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient reverseClient, ReverseClient reverseClient2) {
        return reverseClient2.networkClient() == reverseClient.networkClient();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onCompletion$1(Throwable th, int i, ReverseClient reverseClient, Node node) {
        if (th == null) {
            debug(() -> {
                return new StringBuilder(50).append("Reverse connection has been created for requestId=").append(i).toString();
            });
            return;
        }
        warn(() -> {
            return new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(i).toString();
        }, () -> {
            return th;
        });
        connectionRequests().remove(BoxesRunTime.boxToInteger(i));
        reverseClient.networkClient().processReverseConnectionFailure(node);
    }

    public static final /* synthetic */ void $anonfun$closeReverseConnectionAdmin$3(ClusterLinkInboundConnectionManager clusterLinkInboundConnectionManager, ClusterLinkAdminClient clusterLinkAdminClient) {
        CoreUtils$ coreUtils$ = CoreUtils$.MODULE$;
        JFunction0.mcV.sp spVar = () -> {
            clusterLinkAdminClient.close();
        };
        CoreUtils$ coreUtils$2 = CoreUtils$.MODULE$;
        coreUtils$.swallow(spVar, clusterLinkInboundConnectionManager, Level.WARN);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkInboundConnectionManager(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, String str, Option<ClientInterceptor> option, ClusterLinkMetrics clusterLinkMetrics, Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> function2, Function1<String, KafkaAdminClient> function1, ClusterLinkMetadataManager clusterLinkMetadataManager, KafkaConfig kafkaConfig, Time time) {
        super(clusterLinkData, clusterLinkConfig, str, clusterLinkMetadataManager, clusterLinkMetrics, kafkaConfig);
        this.initialConfig = clusterLinkConfig;
        this.clientInterceptor = option;
        this.metrics = clusterLinkMetrics;
        this.remoteAdminFactory = function2;
        this.localConnAdminFactory = function1;
        this.brokerConfig = kafkaConfig;
        this.time = time;
        this.connectionRequests = new ConcurrentHashMap<>();
        this.nextReverseRequestId = ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.localConnAdmin = new LazyResource<>(() -> {
            return this.createLocalConnAdmin();
        }, confluentAdmin -> {
            $anonfun$localConnAdmin$2(confluentAdmin);
            return BoxedUnit.UNIT;
        }, time, () -> {
            return this.currentConfig().clientsMaxIdleMs();
        });
        this.persistentConnectionSourceLock = new Object();
        logIdent_$eq(new StringBuilder(47).append("[ClusterLinkInboundConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(kafkaConfig.brokerId()).append("] ").toString());
    }
}
