package kafka.link;

import kafka.server.KafkaServer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: SourceConnectionIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001i2AAB\u0004\u0001\u0019!)\u0011\u0003\u0001C\u0001%!9A\u0003\u0001b\u0001\n\u0003*\u0002B\u0002\u000f\u0001A\u0003%a\u0003C\u0003\u001e\u0001\u0011\u0005a\u0004C\u00030\u0001\u0011%\u0001GA\u0010T_V\u00148-Z\"p]:,7\r^5p]&sG/Z4sCRLwN\u001c+fgRT!\u0001C\u0005\u0002\t1Lgn\u001b\u0006\u0002\u0015\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u000e!\tqq\"D\u0001\b\u0013\t\u0001rA\u0001\u000eDYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002'A\u0011a\u0002A\u0001\u001fkN,7k\\;sG\u0016\u001cuN\u001c8fGRLwN\\(sS\u001eLg.\u0019;j_:,\u0012A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\b\u0005>|G.Z1o\u0003})8/Z*pkJ\u001cWmQ8o]\u0016\u001cG/[8o\u001fJLw-\u001b8bi&|g\u000eI\u0001+i\u0016\u001cHoQ8oiJ|G\u000e\\3s\u0007\"\fgnZ3XSRD'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)\u0005y\u0002CA\f!\u0013\t\t\u0003D\u0001\u0003V]&$\bF\u0001\u0003$!\t!S&D\u0001&\u0015\t1s%A\u0002ba&T!\u0001K\u0015\u0002\u000f),\b/\u001b;fe*\u0011!fK\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002Y\u0005\u0019qN]4\n\u00059*#\u0001\u0002+fgR\faD^3sS\u001aL(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8NKR\u0014\u0018nY:\u0015\u0005}\t\u0004\"\u0002\u001a\u0006\u0001\u00041\u0012AD3ya\u0016\u001cGOR1jYV\u0014Xm\u001d\u0015\u0005\u0001Q:\u0004\b\u0005\u0002%k%\u0011a'\n\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017%A\u001d\u0002\u0017%tG/Z4sCRLwN\u001c")
/* loaded from: input_file:kafka/link/SourceConnectionIntegrationTest.class */
public class SourceConnectionIntegrationTest extends ClusterLinkIntegrationTest {
    private final boolean useSourceConnectionOrigination = true;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public boolean useSourceConnectionOrigination() {
        return this.useSourceConnectionOrigination;
    }

    @Test
    public void testControllerChangeWithReverseConnections() {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3());
        sourceCluster().createTopic(topic(), numPartitions(), 3, sourceCluster().createTopic$default$4());
        destCluster().linkTopic(topic(), (short) 3, linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyReverseConnectionMetrics(false);
        KafkaServer controller = destCluster().controller();
        destCluster().killBroker(destCluster().servers().indexOf(controller));
        produceToSourceCluster(10);
        Buffer buffer = (Buffer) destCluster().servers().filter(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testControllerChangeWithReverseConnections$1(this, controller, kafkaServer));
        });
        waitForMirror(buffer, waitForMirror$default$2());
        sourceCluster().killBroker(sourceCluster().servers().indexOf(sourceCluster().controller()));
        produceToSourceCluster(10);
        waitForMirror(buffer, waitForMirror$default$2());
        verifyReverseConnectionMetrics(true);
        verifyMirror(topic(), buffer, verifyMirror$default$3());
    }

    private void verifyReverseConnectionMetrics(boolean z) {
        verifyMetricRange$1(new $colon.colon(sourceCluster().controller(), Nil$.MODULE$), "controller-reverse-connection-count", "source", 1.0d, 2.0d);
        verifyMetricRange$1(new $colon.colon(destCluster().controller(), Nil$.MODULE$), "controller-reverse-connection-count", "destination", 1.0d, 2.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-count", "source", 2.0d, 10.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-count", "destination", 2.0d, 10.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-created-total", "source", 2.0d, 1000.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-created-total", "destination", 2.0d, 1000.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-closed-total", "source", 1.0d, 1000.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-closed-total", "destination", 1.0d, 1000.0d);
        verifyKafkaMetric("reverse-connection-failed-total", verifyKafkaMetric$default$2(), z, verifyKafkaMetric$default$4(), verifyKafkaMetric$default$5(), sourceCluster().aliveServers(), verifyKafkaMetric$default$7());
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "source")}));
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "destination")}));
        double d = totalKafkaMetricValue(sourceCluster().aliveServers(), "link-count", map, totalKafkaMetricValue$default$4());
        double d2 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-count", map, totalKafkaMetricValue$default$4());
        double d3 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-created-total", map, totalKafkaMetricValue$default$4());
        double d4 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-closed-total", map, totalKafkaMetricValue$default$4());
        double d5 = totalKafkaMetricValue(destCluster().aliveServers(), "link-count", map2, totalKafkaMetricValue$default$4());
        double d6 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-count", map2, totalKafkaMetricValue$default$4());
        double d7 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-created-total", map2, totalKafkaMetricValue$default$4());
        double d8 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-closed-total", map2, totalKafkaMetricValue$default$4());
        verifyRange$1(d, sourceCluster().aliveServers().size(), 0.0d, "Source links vs source alive servers");
        verifyRange$1(d2, d6, 2.0d, "Dest vs source active connections");
        verifyRange$1(d2, d3 - d4, 2.0d, "Source active connections vs created-closed");
        verifyRange$1(d5, destCluster().aliveServers().size(), 0.0d, "Dest links vs dest alive servers");
        verifyRange$1(d6, d7 - d8, 2.0d, "Dest active connections vs created-closed");
    }

    public static final /* synthetic */ boolean $anonfun$testControllerChangeWithReverseConnections$1(SourceConnectionIntegrationTest sourceConnectionIntegrationTest, KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        KafkaServer serverWithBrokerId = sourceConnectionIntegrationTest.destCluster().serverWithBrokerId(kafkaServer.config().brokerId());
        return kafkaServer2 == null ? serverWithBrokerId != null : !kafkaServer2.equals(serverWithBrokerId);
    }

    private final void verifyMetricRange$1(Seq seq, String str, String str2, double d, double d2) {
        double kafkaMetricValue = kafkaMetricValue(seq, str, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), str2)})), kafkaMetricValue$default$4());
        Assertions.assertTrue(kafkaMetricValue >= d, new StringBuilder(22).append("Metric ").append(str).append(" too low for ").append(str2).append(": ").append(kafkaMetricValue).toString());
        Assertions.assertTrue(kafkaMetricValue <= d2, new StringBuilder(23).append("Metric ").append(str).append(" too high for ").append(str2).append(": ").append(kafkaMetricValue).toString());
    }

    private static final void verifyRange$1(double d, double d2, double d3, String str) {
        Assertions.assertTrue(Math.abs(d - d2) <= d3, new StringBuilder(25).append(str).append(" : (").append(d).append(", ").append(d2).append(") not within ").append(d3).append(" range").toString());
    }
}
