package kafka.link;

import io.confluent.kafka.replication.push.ReplicationState;
import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: CompactedMirrorTopicTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:4")})
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0015c\u0001B\u0006\r\u0001EAQA\u0006\u0001\u0005\u0002]A\u0011\"\u0007\u0001A\u0002\u0003\u0005\u000b\u0015\u0002\u000e\t\u000b\u0019\u0002A\u0011I\u0014\t\u000bQ\u0002A\u0011B\u001b\t\u000fy\u0002\u0011\u0013!C\u0005\u007f!)!\n\u0001C\u0001\u0017\")Q\u000f\u0001C\u0001m\")Q\u0010\u0001C\u0001}\"9\u0011\u0011\u0003\u0001\u0005\n\u0005M\u0001bBA\u0015\u0001\u0011%\u00111\u0006\u0002\u0019\u0007>l\u0007/Y2uK\u0012l\u0015N\u001d:peR{\u0007/[2UKN$(BA\u0007\u000f\u0003\u0011a\u0017N\\6\u000b\u0003=\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001%A\u00111\u0003F\u0007\u0002\u0019%\u0011Q\u0003\u0004\u0002#\u0003\n\u001cHO]1di\u000ecWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005A\u0002CA\n\u0001\u0003%yF/Z:u\u0013:4w\u000e\u0005\u0002\u001cI5\tAD\u0003\u0002\u001e=\u0005\u0019\u0011\r]5\u000b\u0005}\u0001\u0013a\u00026va&$XM\u001d\u0006\u0003C\t\nQA[;oSRT\u0011aI\u0001\u0004_J<\u0017BA\u0013\u001d\u0005!!Vm\u001d;J]\u001a|\u0017!B:fiV\u0003HC\u0001\u0015/!\tIC&D\u0001+\u0015\u0005Y\u0013!B:dC2\f\u0017BA\u0017+\u0005\u0011)f.\u001b;\t\u000b=\u001a\u0001\u0019\u0001\u000e\u0002\u0011Q,7\u000f^%oM>D#aA\u0019\u0011\u0005m\u0011\u0014BA\u001a\u001d\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u000eg\u0016$X\u000b]\"mkN$XM]:\u0015\u0005!2\u0004bB\u001c\u0005!\u0003\u0005\r\u0001O\u0001\u0010G2,\u0017M\\3s\u0013:$XM\u001d<bYB\u0019\u0011&O\u001e\n\u0005iR#AB(qi&|g\u000e\u0005\u0002*y%\u0011QH\u000b\u0002\u0004\u0013:$\u0018aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00132+\u0005\u0001%F\u0001\u001dBW\u0005\u0011\u0005CA\"I\u001b\u0005!%BA#G\u0003%)hn\u00195fG.,GM\u0003\u0002HU\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u0005%#%!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006AB/Z:u\u0007>l\u0007/Y2uK\u0012l\u0015N\u001d:peR{\u0007/[2\u0015\t!b\u0015L\u0018\u0005\u0006\u001b\u001a\u0001\rAT\u0001\u0007cV|'/^7\u0011\u0005=3fB\u0001)U!\t\t&&D\u0001S\u0015\t\u0019\u0006#\u0001\u0004=e>|GOP\u0005\u0003+*\na\u0001\u0015:fI\u00164\u0017BA,Y\u0005\u0019\u0019FO]5oO*\u0011QK\u000b\u0005\u00065\u001a\u0001\raW\u0001\fG>|'\u000fZ5oCR|'\u000f\u0005\u0002*9&\u0011QL\u000b\u0002\b\u0005>|G.Z1o\u0011\u0015yf\u00011\u0001O\u0003AawnY1m%\u0016\u0004H.[2bi&|g\u000e\u000b\u0003\u0007C\u001eD\u0007C\u00012f\u001b\u0005\u0019'B\u00013\u001f\u0003\u0019\u0001\u0018M]1ng&\u0011am\u0019\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013![\u0001>w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBjhfY8pe\u0012Lg.\u0019;pevZ\u0018' \u0018m_\u000e\fGNU3qY&\u001c\u0017\r^5p]vZ(' \u0015\u0005\r-\f(\u000f\u0005\u0002m_6\tQN\u0003\u0002oG\u0006A\u0001O]8wS\u0012,'/\u0003\u0002q[\naQ*\u001a;i_\u0012\u001cv.\u001e:dK\u0006)a/\u00197vK2\n1/I\u0001u\u0003!\nXo\u001c:v[\u000e{wN\u001d3j]\u0006$xN\u001d*fa2L7-\u0019;j_:\u001cu.\u001c2j]\u0006$\u0018n\u001c8t\u0003\u001d\"Xm\u001d;D_6\u0004\u0018m\u0019;fI6K'O]8s)>\u0004\u0018nY,ji\"dunZ\"mK\u0006t\u0017N\\4\u0015\t!:\b0\u001f\u0005\u0006\u001b\u001e\u0001\rA\u0014\u0005\u00065\u001e\u0001\ra\u0017\u0005\u0006?\u001e\u0001\rA\u0014\u0015\u0005\u000f\u0005<\u0007\u000e\u000b\u0003\bWFdH&A:\u0002AQ,7\u000f^'jeJ|'oQ8na\u0006\u001cG/\u001a3U_BL7mV5uQ\u001e\u000b\u0007o\u001d\u000b\u0005Q}\f\t\u0001C\u0003N\u0011\u0001\u0007a\nC\u0003[\u0011\u0001\u00071\fK\u0003\tC\u001e\f)!\t\u0002\u0002\b\u0005A3\u0010Z5ta2\f\u0017PT1nKvt\u0013/^8sk6l4\u0010M?/G>|'\u000fZ5oCR|'/P>2{\"*\u0001b[9\u0002\f1\u0012\u0011QB\u0011\u0003\u0003\u001f\tq\"\u00197m\u0007>l'-\u001b8bi&|gn]\u0001\u0013o\u0006LGOR8s\u0019><7\t\\3b]&tw\rF\u0003)\u0003+\ty\u0002C\u0004\u0002\u0018%\u0001\r!!\u0007\u0002\u000f\rdWo\u001d;feB\u00191#a\u0007\n\u0007\u0005uAB\u0001\fDYV\u001cH/\u001a:MS:\\G+Z:u\u0011\u0006\u0014h.Z:t\u0011\u001d\t\t#\u0003a\u0001\u0003G\taa\u001c4gg\u0016$\bcA\u0015\u0002&%\u0019\u0011q\u0005\u0016\u0003\t1{gnZ\u0001\u000fCB\u0004XM\u001c3Bi>3gm]3u)\u0015A\u0013QFA\u0019\u0011\u0019\tyC\u0003a\u0001w\u0005I\u0001/\u0019:uSRLwN\u001c\u0005\b\u0003CQ\u0001\u0019AA\u0012Q\u0019\u0001\u0011QG9\u0002<A\u00191$a\u000e\n\u0007\u0005eBDA\u0002UC\u001e\f#!!\u0010\u0002\u0017%tG/Z4sCRLwN\u001c\u0015\u0007\u0001\u0005U\u0012/!\u0011\"\u0005\u0005\r\u0013a\u00052bu\u0016d'h\u001d5be\u0012|6m\\;oij\"\u0004")
/* loaded from: input_file:kafka/link/CompactedMirrorTopicTest.class */
public class CompactedMirrorTopicTest extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && sourceCluster() == null && destCluster() == null) {
            SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
            None$ none$ = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, none$, 0, 2));
            SecurityProtocol securityProtocol2 = SecurityProtocol.PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$3 = ClusterLinkTestHarness$.MODULE$;
            None$ none$2 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$4 = ClusterLinkTestHarness$.MODULE$;
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, none$2, 100, 2));
            return;
        }
        if (sourceCluster() == null && destCluster() == null) {
            SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_SSL;
            ClusterLinkTestHarness$ clusterLinkTestHarness$5 = ClusterLinkTestHarness$.MODULE$;
            None$ none$3 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$6 = ClusterLinkTestHarness$.MODULE$;
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, none$3, 0, 2));
            SecurityProtocol securityProtocol4 = SecurityProtocol.SASL_PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$7 = ClusterLinkTestHarness$.MODULE$;
            None$ none$4 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$8 = ClusterLinkTestHarness$.MODULE$;
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, none$4, 100, 2));
        }
    }

    private void setUpClusters(Option<Object> option) {
        sourceCluster().producerConfig().setProperty("batch.size", "150");
        option.foreach(obj -> {
            return $anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt(obj));
        });
        super.setUp(this._testInfo);
        numPartitions_$eq(2);
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("segment.bytes", "1000");
        properties.put("min.compaction.lag.ms", "5");
        properties.put("max.compaction.lag.ms", "2000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testCompactedMirrorTopic(String str, boolean z, String str2) {
        setUpClusters(None$.MODULE$);
        produceToSourceCluster(10);
        appendRecords$1(100L);
        appendRecords$1(4294967294L);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().maybeWaitUntilReplicasInPushMode(new TopicPartition(topic(), 0));
        Assertions.assertFalse(TestUtils$.MODULE$.hasAllReplicasInReplicationMode(destCluster().brokers(), new TopicPartition(topic(), 0), ReplicationState.Mode.PUSH));
        appendRecords$1(10L);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testCompactedMirrorTopicWithLogCleaning(String str, boolean z, String str2) {
        setUpClusters(new Some(BoxesRunTime.boxToInteger(5)));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        produceRecords(sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3()), topic(), 500, obj -> {
            return $anonfun$testCompactedMirrorTopicWithLogCleaning$1(BoxesRunTime.unboxToInt(obj));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        waitForLogCleaning(sourceCluster(), 400L);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitForLogCleaning(destCluster(), 400L);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorCompactedTopicWithGaps(String str, boolean z) {
        numPartitions_$eq(1);
        setUpClusters(new Some(BoxesRunTime.boxToInteger(5)));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), 2L, destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        produceRecords(createProducer, topic(), 2, obj -> {
            return $anonfun$testMirrorCompactedTopicWithGaps$1(BoxesRunTime.unboxToInt(obj));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, topic(), 500, obj2 -> {
            return $anonfun$testMirrorCompactedTopicWithGaps$2(BoxesRunTime.unboxToInt(obj2));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, topic(), 2, obj3 -> {
            return $anonfun$testMirrorCompactedTopicWithGaps$3(BoxesRunTime.unboxToInt(obj3));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, topic(), 500, obj4 -> {
            return $anonfun$testMirrorCompactedTopicWithGaps$4(BoxesRunTime.unboxToInt(obj4));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, topic(), 2, obj5 -> {
            return $anonfun$testMirrorCompactedTopicWithGaps$5(BoxesRunTime.unboxToInt(obj5));
        }, produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        waitForLogCleaning(sourceCluster(), 900L);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private void waitForLogCleaning(ClusterLinkTestHarness clusterLinkTestHarness, long j) {
        partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).foreach(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForLogCleaning$1(clusterLinkTestHarness, j, topicPartition));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendAtOffset(int i, long j) {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(allocate);
        long currentTimeMillis = System.currentTimeMillis();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBufferOutputStream, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, j, currentTimeMillis, 0L, (short) 0, 0, false, false, 0, allocate.capacity());
        byte[] bytes = new StringBuilder(4).append("key-").append(j).toString().getBytes();
        byte[] bytes2 = new StringBuilder(6).append("value-").append(j).toString().getBytes();
        memoryRecordsBuilder.append(currentTimeMillis, bytes, bytes2);
        MemoryRecords build = memoryRecordsBuilder.build();
        TopicPartition topicPartition = new TopicPartition(topic(), i);
        LogManager logManager = sourceCluster().partitionLeader(topicPartition).logManager();
        ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).appendAsFollower(build);
        producedRecords().$plus$eq(new AbstractClusterLinkIntegrationTest.SourceRecord(this, topic(), i, bytes, bytes2, j));
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedMirrorTopicTest compactedMirrorTopicTest, int i) {
        compactedMirrorTopicTest.sourceCluster().serverConfig().setProperty("log.retention.check.interval.ms", Integer.toString(i));
        compactedMirrorTopicTest.destCluster().serverConfig().setProperty("log.retention.check.interval.ms", Integer.toString(i));
        return compactedMirrorTopicTest.destCluster().serverConfig().setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, Integer.toString(i));
    }

    private final void appendRecords$1(long j) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).foreach$mVc$sp(i -> {
            long nextOffset = this.nextOffset(i);
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i -> {
                this.appendAtOffset(i, nextOffset + (i * j) + i);
            });
        });
    }

    public static final /* synthetic */ String $anonfun$testCompactedMirrorTopicWithLogCleaning$1(int i) {
        return new StringBuilder(4).append("key ").append(i % 5).toString();
    }

    public static final /* synthetic */ String $anonfun$testMirrorCompactedTopicWithGaps$1(int i) {
        return new StringBuilder(5).append("key1 ").append(i).toString();
    }

    public static final /* synthetic */ String $anonfun$testMirrorCompactedTopicWithGaps$2(int i) {
        return new StringBuilder(5).append("key2 ").append(i % 5).toString();
    }

    public static final /* synthetic */ String $anonfun$testMirrorCompactedTopicWithGaps$3(int i) {
        return new StringBuilder(5).append("key3 ").append(i).toString();
    }

    public static final /* synthetic */ String $anonfun$testMirrorCompactedTopicWithGaps$4(int i) {
        return new StringBuilder(5).append("key4 ").append(i % 5).toString();
    }

    public static final /* synthetic */ String $anonfun$testMirrorCompactedTopicWithGaps$5(int i) {
        return new StringBuilder(5).append("key5 ").append(i).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness clusterLinkTestHarness, long j, TopicPartition topicPartition) {
        return clusterLinkTestHarness.partitionLeader(topicPartition).logManager().cleaner().awaitCleaned(topicPartition, j, 15000L);
    }
}
