package kafka.link;

import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: CompactedMirrorTopicTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005%4AAC\u0006\u0001!!)Q\u0003\u0001C\u0001-!I\u0001\u0004\u0001a\u0001\u0002\u0003\u0006K!\u0007\u0005\u0006K\u0001!\tE\n\u0005\u0006g\u0001!I\u0001\u000e\u0005\b{\u0001\t\n\u0011\"\u0003?\u0011\u0015I\u0005\u0001\"\u0001K\u0011\u0015y\u0005\u0001\"\u0001K\u0011\u0015\t\u0006\u0001\"\u0003S\u0011\u0015i\u0006\u0001\"\u0003_\u0005a\u0019u.\u001c9bGR,G-T5se>\u0014Hk\u001c9jGR+7\u000f\u001e\u0006\u0003\u00195\tA\u0001\\5oW*\ta\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0002C\u0001\n\u0014\u001b\u0005Y\u0011B\u0001\u000b\f\u0005\t\n%m\u001d;sC\u000e$8\t\\;ti\u0016\u0014H*\u001b8l\u0013:$Xm\u001a:bi&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012a\u0006\t\u0003%\u0001\t\u0011b\u0018;fgRLeNZ8\u0011\u0005i\u0019S\"A\u000e\u000b\u0005qi\u0012aA1qS*\u0011adH\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u0001\u0013%A\u0003kk:LGOC\u0001#\u0003\ry'oZ\u0005\u0003Im\u0011\u0001\u0002V3ti&sgm\\\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0003O5\u0002\"\u0001K\u0016\u000e\u0003%R\u0011AK\u0001\u0006g\u000e\fG.Y\u0005\u0003Y%\u0012A!\u00168ji\")af\u0001a\u00013\u0005AA/Z:u\u0013:4w\u000e\u000b\u0002\u0004aA\u0011!$M\u0005\u0003em\u0011!BQ3g_J,W)Y2i\u00035\u0019X\r^+q\u00072,8\u000f^3sgR\u0011q%\u000e\u0005\bm\u0011\u0001\n\u00111\u00018\u0003=\u0019G.Z1oKJLe\u000e^3sm\u0006d\u0007c\u0001\u00159u%\u0011\u0011(\u000b\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005!Z\u0014B\u0001\u001f*\u0005\rIe\u000e^\u0001\u0018g\u0016$X\u000b]\"mkN$XM]:%I\u00164\u0017-\u001e7uIE*\u0012a\u0010\u0016\u0003o\u0001[\u0013!\u0011\t\u0003\u0005\u001ek\u0011a\u0011\u0006\u0003\t\u0016\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005\u0019K\u0013AC1o]>$\u0018\r^5p]&\u0011\u0001j\u0011\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017\u0001\u0007;fgR\u001cu.\u001c9bGR,G-T5se>\u0014Hk\u001c9jGR\tq\u0005\u000b\u0002\u0007\u0019B\u0011!$T\u0005\u0003\u001dn\u0011A\u0001V3ti\u00069C/Z:u\u0007>l\u0007/Y2uK\u0012l\u0015N\u001d:peR{\u0007/[2XSRDGj\\4DY\u0016\fg.\u001b8hQ\t9A*\u0001\nxC&$hi\u001c:M_\u001e\u001cE.Z1oS:<GcA\u0014T1\")A\u000b\u0003a\u0001+\u000691\r\\;ti\u0016\u0014\bC\u0001\nW\u0013\t96B\u0001\fDYV\u001cH/\u001a:MS:\\G+Z:u\u0011\u0006\u0014h.Z:t\u0011\u0015I\u0006\u00021\u0001[\u0003\u0019ygMZ:fiB\u0011\u0001fW\u0005\u00039&\u0012A\u0001T8oO\u0006q\u0011\r\u001d9f]\u0012\fEo\u00144gg\u0016$HcA\u0014`C\")\u0001-\u0003a\u0001u\u0005I\u0001/\u0019:uSRLwN\u001c\u0005\u00063&\u0001\rA\u0017\u0015\u0005\u0001\r4w\r\u0005\u0002\u001bI&\u0011Qm\u0007\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017%\u00015\u0002\u0017%tG/Z4sCRLwN\u001c")
/* loaded from: input_file:kafka/link/CompactedMirrorTopicTest.class */
public class CompactedMirrorTopicTest extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    private void setUpClusters(Option<Object> option) {
        sourceCluster().producerConfig().setProperty("batch.size", "150");
        option.foreach(obj -> {
            return $anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt(obj));
        });
        super.setUp(this._testInfo);
        numPartitions_$eq(2);
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("segment.bytes", "1000");
        properties.put("min.compaction.lag.ms", "5");
        properties.put("max.compaction.lag.ms", "2000");
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster().createTopic$default$5());
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @Test
    public void testCompactedMirrorTopic() {
        setUpClusters(None$.MODULE$);
        produceToSourceCluster(10);
        appendRecords$1(100L);
        appendRecords$1(4294967294L);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4(), destCluster().linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        appendRecords$1(10L);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @Test
    public void testCompactedMirrorTopicWithLogCleaning() {
        setUpClusters(new Some(BoxesRunTime.boxToInteger(5)));
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4(), destCluster().linkTopic$default$5());
        produceRecords(sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3()), topic(), 500, obj -> {
            return $anonfun$testCompactedMirrorTopicWithLogCleaning$1(BoxesRunTime.unboxToInt(obj));
        });
        waitForLogCleaning(sourceCluster(), 400L);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        waitForLogCleaning(destCluster(), 400L);
    }

    private void waitForLogCleaning(ClusterLinkTestHarness clusterLinkTestHarness, long j) {
        partitions(partitions$default$1()).foreach(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForLogCleaning$1(clusterLinkTestHarness, j, topicPartition));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendAtOffset(int i, long j) {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(allocate);
        long currentTimeMillis = System.currentTimeMillis();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBufferOutputStream, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j, currentTimeMillis, 0L, (short) 0, 0, false, false, 0, allocate.capacity());
        byte[] bytes = new StringBuilder(4).append("key-").append(j).toString().getBytes();
        byte[] bytes2 = new StringBuilder(6).append("value-").append(j).toString().getBytes();
        memoryRecordsBuilder.append(currentTimeMillis, bytes, bytes2);
        MemoryRecords build = memoryRecordsBuilder.build();
        TopicPartition topicPartition = new TopicPartition(topic(), i);
        LogManager logManager = sourceCluster().partitionLeader(topicPartition).logManager();
        AbstractLog abstractLog = (AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get();
        abstractLog.appendAsFollower(build, abstractLog.appendAsFollower$default$2());
        producedRecords().$plus$eq(new AbstractClusterLinkIntegrationTest.SourceRecord(this, topic(), i, bytes, bytes2, j));
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedMirrorTopicTest compactedMirrorTopicTest, int i) {
        compactedMirrorTopicTest.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(i));
        compactedMirrorTopicTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(i));
        return compactedMirrorTopicTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogDeleteDelayMsProp(), Integer.toString(i));
    }

    private final void appendRecords$1(long j) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).foreach$mVc$sp(i -> {
            long nextOffset = this.nextOffset(i);
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i -> {
                this.appendAtOffset(i, nextOffset + (i * j) + i);
            });
        });
    }

    public static final /* synthetic */ String $anonfun$testCompactedMirrorTopicWithLogCleaning$1(int i) {
        return new StringBuilder(4).append("key ").append(i % 5).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness clusterLinkTestHarness, long j, TopicPartition topicPartition) {
        return clusterLinkTestHarness.partitionLeader(topicPartition).logManager().cleaner().awaitCleaned(topicPartition, j, 15000L);
    }
}
