package kafka.link;

import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Range;
import scala.collection.immutable.Range$;
import scala.collection.mutable.Buffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: CompactedTopicMirrorTest.scala */
@ScalaSignature(bytes = "\u0006\u0005-4A!\u0003\u0006\u0001\u001f!)A\u0003\u0001C\u0001+!)q\u0003\u0001C!1!)\u0001\u0006\u0001C\u0005S!9!\u0007AI\u0001\n\u0013\u0019\u0004\"\u0002 \u0001\t\u0003A\u0002\"B\"\u0001\t\u0003A\u0002\"B#\u0001\t\u00131\u0005\"B)\u0001\t\u0013\u0011&\u0001G\"p[B\f7\r^3e)>\u0004\u0018nY'jeJ|'\u000fV3ti*\u00111\u0002D\u0001\u0005Y&t7NC\u0001\u000e\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\t\u0011\u0005E\u0011R\"\u0001\u0006\n\u0005MQ!AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002-A\u0011\u0011\u0003A\u0001\u0006g\u0016$X\u000b\u001d\u000b\u00023A\u0011!$H\u0007\u00027)\tA$A\u0003tG\u0006d\u0017-\u0003\u0002\u001f7\t!QK\\5uQ\t\u0011\u0001\u0005\u0005\u0002\"M5\t!E\u0003\u0002$I\u0005)!.\u001e8ji*\tQ%A\u0002pe\u001eL!a\n\u0012\u0003\r\t+gm\u001c:f\u00035\u0019X\r^+q\u00072,8\u000f^3sgR\u0011\u0011D\u000b\u0005\bW\r\u0001\n\u00111\u0001-\u0003=\u0019G.Z1oKJLe\u000e^3sm\u0006d\u0007c\u0001\u000e._%\u0011af\u0007\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005i\u0001\u0014BA\u0019\u001c\u0005\rIe\u000e^\u0001\u0018g\u0016$X\u000b]\"mkN$XM]:%I\u00164\u0017-\u001e7uIE*\u0012\u0001\u000e\u0016\u0003YUZ\u0013A\u000e\t\u0003oqj\u0011\u0001\u000f\u0006\u0003si\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005mZ\u0012AC1o]>$\u0018\r^5p]&\u0011Q\b\u000f\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017\u0001\u0007;fgR\u001cu.\u001c9bGR,G\rV8qS\u000el\u0015N\u001d:pe\"\u0012Q\u0001\u0011\t\u0003C\u0005K!A\u0011\u0012\u0003\tQ+7\u000f^\u0001(i\u0016\u001cHoQ8na\u0006\u001cG/\u001a3U_BL7-T5se>\u0014x+\u001b;i\u0019><7\t\\3b]&tw\r\u000b\u0002\u0007\u0001\u0006\u0011r/Y5u\r>\u0014Hj\\4DY\u0016\fg.\u001b8h)\rIr\t\u0014\u0005\u0006\u0011\u001e\u0001\r!S\u0001\bG2,8\u000f^3s!\t\t\"*\u0003\u0002L\u0015\t12\t\\;ti\u0016\u0014H*\u001b8l)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000fC\u0003N\u000f\u0001\u0007a*\u0001\u0004pM\u001a\u001cX\r\u001e\t\u00035=K!\u0001U\u000e\u0003\t1{gnZ\u0001\u000fCB\u0004XM\u001c3Bi>3gm]3u)\rI2+\u0016\u0005\u0006)\"\u0001\raL\u0001\na\u0006\u0014H/\u001b;j_:DQ!\u0014\u0005A\u00029CC\u0001A,`AB\u0011\u0001,X\u0007\u00023*\u0011!lW\u0001\u000bG\u0006$XmZ8sS\u0016\u001c(B\u0001/#\u00031)\u0007\u0010]3sS6,g\u000e^1m\u0013\tq\u0016L\u0001\u0005DCR,wm\u001c:z\u0003\u00151\u0018\r\\;fY\u0005\t7%\u00012\u0011\u0005\rLW\"\u00013\u000b\u0005\u00154\u0017\u0001\u0002;fgRT!!D4\u000b\u0005!$\u0013AB1qC\u000eDW-\u0003\u0002kI\ny\u0011J\u001c;fOJ\fG/[8o)\u0016\u001cH\u000f")
@Category({IntegrationTest.class})
/* loaded from: input_file:kafka/link/CompactedTopicMirrorTest.class */
public class CompactedTopicMirrorTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @Before
    public void setUp() {
    }

    private void setUpClusters(Option<Object> option) {
        sourceCluster().producerConfig().setProperty("batch.size", "150");
        if (option == null) {
            throw null;
        }
        if (!option.isEmpty()) {
            $anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt(option.get()));
        }
        super.setUp();
        numPartitions_$eq(2);
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("segment.bytes", "1000");
        properties.put("min.compaction.lag.ms", "5");
        properties.put("max.compaction.lag.ms", "2000");
        sourceCluster().createTopic(topic(), numPartitions(), 2, properties);
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @Test
    public void testCompactedTopicMirror() {
        setUpClusters(None$.MODULE$);
        produceToSourceCluster(10);
        appendRecords$1(100L);
        appendRecords$1(4294967294L);
        destCluster().createClusterLink(linkName(), sourceCluster(), destCluster().createClusterLink$default$3(), destCluster().createClusterLink$default$4(), destCluster().createClusterLink$default$5(), destCluster().createClusterLink$default$6());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        appendRecords$1(10L);
        verifyMirror(topic());
    }

    @Test
    public void testCompactedTopicMirrorWithLogCleaning() {
        setUpClusters(new Some(5));
        destCluster().createClusterLink(linkName(), sourceCluster(), destCluster().createClusterLink$default$3(), destCluster().createClusterLink$default$4(), destCluster().createClusterLink$default$5(), destCluster().createClusterLink$default$6());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        produceRecords(sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3()), topic(), 500, obj -> {
            return $anonfun$testCompactedTopicMirrorWithLogCleaning$1(BoxesRunTime.unboxToInt(obj));
        });
        waitForLogCleaning(sourceCluster(), 400L);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitForLogCleaning(destCluster(), 400L);
    }

    private void waitForLogCleaning(ClusterLinkTestHarness clusterLinkTestHarness, long j) {
        partitions().foreach(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForLogCleaning$1(clusterLinkTestHarness, j, topicPartition));
        });
    }

    private void appendAtOffset(int i, long j) {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(allocate);
        long currentTimeMillis = System.currentTimeMillis();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBufferOutputStream, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j, currentTimeMillis, 0L, (short) 0, 0, false, false, 0, allocate.capacity());
        byte[] bytes = new StringBuilder(4).append("key-").append(j).toString().getBytes();
        byte[] bytes2 = new StringBuilder(6).append("value-").append(j).toString().getBytes();
        memoryRecordsBuilder.append(currentTimeMillis, bytes, bytes2);
        MemoryRecords build = memoryRecordsBuilder.build();
        TopicPartition topicPartition = new TopicPartition(topic(), i);
        LogManager logManager = sourceCluster().partitionLeader(topicPartition).logManager();
        ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).appendAsFollower(build);
        Buffer<AbstractClusterLinkIntegrationTest.SourceRecord> producedRecords = producedRecords();
        AbstractClusterLinkIntegrationTest.SourceRecord sourceRecord = new AbstractClusterLinkIntegrationTest.SourceRecord(this, topic(), i, bytes, bytes2, j);
        if (producedRecords == null) {
            throw null;
        }
        producedRecords.addOne(sourceRecord);
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedTopicMirrorTest compactedTopicMirrorTest, int i) {
        compactedTopicMirrorTest.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.valueOf(i).toString());
        compactedTopicMirrorTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.valueOf(i).toString());
        return compactedTopicMirrorTest.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogDeleteDelayMsProp(), Integer.valueOf(i).toString());
    }

    public static final /* synthetic */ void $anonfun$testCompactedTopicMirror$2(CompactedTopicMirrorTest compactedTopicMirrorTest, int i, long j, long j2, int i2) {
        compactedTopicMirrorTest.appendAtOffset(i, j + (i2 * j2) + i);
    }

    public static final /* synthetic */ void $anonfun$testCompactedTopicMirror$1(CompactedTopicMirrorTest compactedTopicMirrorTest, long j, int i) {
        long nextOffset = compactedTopicMirrorTest.nextOffset(i);
        RichInt$ richInt$ = RichInt$.MODULE$;
        Range$ range$ = Range$.MODULE$;
        Range.Inclusive inclusive = new Range.Inclusive(1, 10, 1);
        if (inclusive.isEmpty()) {
            return;
        }
        int start = inclusive.start();
        while (true) {
            int i2 = start;
            $anonfun$testCompactedTopicMirror$2(compactedTopicMirrorTest, i, nextOffset, j, i2);
            if (i2 == ((Range) inclusive).scala$collection$immutable$Range$$lastElement) {
                return;
            } else {
                start = i2 + inclusive.step();
            }
        }
    }

    private final void appendRecords$1(long j) {
        RichInt$ richInt$ = RichInt$.MODULE$;
        int numPartitions = numPartitions();
        Range$ range$ = Range$.MODULE$;
        Range.Exclusive exclusive = new Range.Exclusive(0, numPartitions, 1);
        if (exclusive.isEmpty()) {
            return;
        }
        int start = exclusive.start();
        while (true) {
            int i = start;
            $anonfun$testCompactedTopicMirror$1(this, j, i);
            if (i == ((Range) exclusive).scala$collection$immutable$Range$$lastElement) {
                return;
            } else {
                start = i + exclusive.step();
            }
        }
    }

    public static final /* synthetic */ String $anonfun$testCompactedTopicMirrorWithLogCleaning$1(int i) {
        return new StringBuilder(4).append("key ").append(i % 5).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness clusterLinkTestHarness, long j, TopicPartition topicPartition) {
        return clusterLinkTestHarness.partitionLeader(topicPartition).logManager().cleaner().awaitCleaned(topicPartition, j, 15000L);
    }
}
