package kafka.link;

import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: CompactedMirrorTopicTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u00054A!\u0003\u0006\u0001\u001f!)A\u0003\u0001C\u0001+!)q\u0003\u0001C!1!)A\u0006\u0001C\u0005[!9a\u0007AI\u0001\n\u00139\u0004\"\u0002\"\u0001\t\u0003A\u0002\"B$\u0001\t\u0003A\u0002\"B%\u0001\t\u0013Q\u0005\"B+\u0001\t\u00131&\u0001G\"p[B\f7\r^3e\u001b&\u0014(o\u001c:U_BL7\rV3ti*\u00111\u0002D\u0001\u0005Y&t7NC\u0001\u000e\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\t\u0011\u0005E\u0011R\"\u0001\u0006\n\u0005MQ!AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002-A\u0011\u0011\u0003A\u0001\u0006g\u0016$X\u000b\u001d\u000b\u00023A\u0011!$H\u0007\u00027)\tA$A\u0003tG\u0006d\u0017-\u0003\u0002\u001f7\t!QK\\5uQ\t\u0011\u0001\u0005\u0005\u0002\"U5\t!E\u0003\u0002$I\u0005\u0019\u0011\r]5\u000b\u0005\u00152\u0013a\u00026va&$XM\u001d\u0006\u0003O!\nQA[;oSRT\u0011!K\u0001\u0004_J<\u0017BA\u0016#\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u000eg\u0016$X\u000b]\"mkN$XM]:\u0015\u0005eq\u0003bB\u0018\u0004!\u0003\u0005\r\u0001M\u0001\u0010G2,\u0017M\\3s\u0013:$XM\u001d<bYB\u0019!$M\u001a\n\u0005IZ\"AB(qi&|g\u000e\u0005\u0002\u001bi%\u0011Qg\u0007\u0002\u0004\u0013:$\u0018aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00132+\u0005A$F\u0001\u0019:W\u0005Q\u0004CA\u001eA\u001b\u0005a$BA\u001f?\u0003%)hn\u00195fG.,GM\u0003\u0002@7\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u0005\u0005c$!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006AB/Z:u\u0007>l\u0007/Y2uK\u0012l\u0015N\u001d:peR{\u0007/[2)\u0005\u0015!\u0005CA\u0011F\u0013\t1%E\u0001\u0003UKN$\u0018a\n;fgR\u001cu.\u001c9bGR,G-T5se>\u0014Hk\u001c9jG^KG\u000f\u001b'pO\u000ecW-\u00198j]\u001eD#A\u0002#\u0002%]\f\u0017\u000e\u001e$pe2{wm\u00117fC:Lgn\u001a\u000b\u00043-\u0003\u0006\"\u0002'\b\u0001\u0004i\u0015aB2mkN$XM\u001d\t\u0003#9K!a\u0014\u0006\u0003-\rcWo\u001d;fe2Kgn\u001b+fgRD\u0015M\u001d8fgNDQ!U\u0004A\u0002I\u000baa\u001c4gg\u0016$\bC\u0001\u000eT\u0013\t!6D\u0001\u0003M_:<\u0017AD1qa\u0016tG-\u0011;PM\u001a\u001cX\r\u001e\u000b\u00043]K\u0006\"\u0002-\t\u0001\u0004\u0019\u0014!\u00039beRLG/[8o\u0011\u0015\t\u0006\u00021\u0001SQ\u0011\u00011LX0\u0011\u0005\u0005b\u0016BA/#\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0002A\u0006Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/CompactedMirrorTopicTest.class */
public class CompactedMirrorTopicTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp() {
    }

    private void setUpClusters(Option<Object> option) {
        sourceCluster().producerConfig().setProperty("batch.size", "150");
        option.foreach(obj -> {
            return $anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt(obj));
        });
        super.setUp();
        numPartitions_$eq(2);
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("segment.bytes", "1000");
        properties.put("min.compaction.lag.ms", "5");
        properties.put("max.compaction.lag.ms", "2000");
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), properties);
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @Test
    public void testCompactedMirrorTopic() {
        setUpClusters(setUpClusters$default$1());
        produceToSourceCluster(10);
        appendRecords$1(100L);
        appendRecords$1(4294967294L);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        appendRecords$1(10L);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testCompactedMirrorTopicWithLogCleaning() {
        setUpClusters(new Some(BoxesRunTime.boxToInteger(5)));
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceRecords(sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3()), topic(), 500, obj -> {
            return $anonfun$testCompactedMirrorTopicWithLogCleaning$1(BoxesRunTime.unboxToInt(obj));
        });
        waitForLogCleaning(sourceCluster(), 400L);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitForLogCleaning(destCluster(), 400L);
    }

    private void waitForLogCleaning(ClusterLinkTestHarness clusterLinkTestHarness, long j) {
        partitions().foreach(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForLogCleaning$1(clusterLinkTestHarness, j, topicPartition));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendAtOffset(int i, long j) {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(allocate);
        long currentTimeMillis = System.currentTimeMillis();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBufferOutputStream, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j, currentTimeMillis, 0L, (short) 0, 0, false, false, 0, allocate.capacity());
        byte[] bytes = new StringBuilder(4).append("key-").append(j).toString().getBytes();
        byte[] bytes2 = new StringBuilder(6).append("value-").append(j).toString().getBytes();
        memoryRecordsBuilder.append(currentTimeMillis, bytes, bytes2);
        MemoryRecords build = memoryRecordsBuilder.build();
        TopicPartition topicPartition = new TopicPartition(topic(), i);
        LogManager logManager = sourceCluster().partitionLeader(topicPartition).logManager();
        ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).appendAsFollower(build);
        producedRecords().$plus$eq(new AbstractClusterLinkIntegrationTest.SourceRecord(this, topic(), i, bytes, bytes2, j));
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedMirrorTopicTest compactedMirrorTopicTest, int i) {
        compactedMirrorTopicTest.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(i));
        compactedMirrorTopicTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(i));
        return compactedMirrorTopicTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogDeleteDelayMsProp(), Integer.toString(i));
    }

    private final void appendRecords$1(long j) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).foreach$mVc$sp(i -> {
            long nextOffset = this.nextOffset(i);
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i -> {
                this.appendAtOffset(i, nextOffset + (i * j) + i);
            });
        });
    }

    public static final /* synthetic */ String $anonfun$testCompactedMirrorTopicWithLogCleaning$1(int i) {
        return new StringBuilder(4).append("key ").append(i % 5).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness clusterLinkTestHarness, long j, TopicPartition topicPartition) {
        return clusterLinkTestHarness.partitionLeader(topicPartition).logManager().cleaner().awaitCleaned(topicPartition, j, 15000L);
    }
}
