package kafka.utils;

import io.confluent.kafka.storage.checksum.ChecksumParams;
import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.LocalLog;
import kafka.log.LocalLog$;
import kafka.log.LogLoader;
import kafka.log.LogLoader$;
import kafka.log.MergedLog;
import kafka.log.MergedLog$;
import kafka.log.TierLogComponents;
import kafka.log.TierLogComponents$;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.server.BrokerTopicStats;
import kafka.server.Defaults$;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.LoadedLogOffsets;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogOffsetsListener;
import org.apache.kafka.storage.internals.log.LogSegments;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.None$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: SchedulerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005ua\u0001B\f\u0019\u0001uAQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001C\u0002\u0013\u0005\u0011\u0006\u0003\u00048\u0001\u0001\u0006IA\u000b\u0005\bq\u0001\u0011\r\u0011\"\u0001:\u0011\u0019i\u0004\u0001)A\u0005u!9a\b\u0001b\u0001\n\u0003y\u0004BB&\u0001A\u0003%\u0001\tC\u0004M\u0001\t\u0007I\u0011A \t\r5\u0003\u0001\u0015!\u0003A\u0011\u001dq\u0005A1A\u0005\u0002=Ca!\u0018\u0001!\u0002\u0013\u0001\u0006\"\u00020\u0001\t\u0003y\u0006\"\u00028\u0001\t\u0003y\u0006\"B:\u0001\t\u0003y\u0006\"\u0002=\u0001\t\u0003y\u0006\"\u0002>\u0001\t\u0003y\u0006\"\u0002?\u0001\t\u0003y\u0006\"\u0002@\u0001\t\u0003y\u0006BBA\u0001\u0001\u0011\u0005q\f\u0003\u0004\u0002\u0006\u0001!\ta\u0018\u0005\u0007\u0003\u0013\u0001A\u0011A0\t\r\u00055\u0001\u0001\"\u0001`\u00055\u00196\r[3ek2,'\u000fV3ti*\u0011\u0011DG\u0001\u0006kRLGn\u001d\u0006\u00027\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u001f!\ty\"%D\u0001!\u0015\u0005\t\u0013!B:dC2\f\u0017BA\u0012!\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012A\n\t\u0003O\u0001i\u0011\u0001G\u0001\ng\u000eDW\rZ;mKJ,\u0012A\u000b\t\u0003WUj\u0011\u0001\f\u0006\u0003[9\nA!\u001e;jY*\u0011q\u0006M\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0005m\t$B\u0001\u001a4\u0003\u0019\t\u0007/Y2iK*\tA'A\u0002pe\u001eL!A\u000e\u0017\u0003\u001d-\u000bgm[1TG\",G-\u001e7fe\u0006Q1o\u00195fIVdWM\u001d\u0011\u0002\u00115|7m\u001b+j[\u0016,\u0012A\u000f\t\u0003WmJ!\u0001\u0010\u0017\u0003\u00115{7m\u001b+j[\u0016\f\u0011\"\\8dWRKW.\u001a\u0011\u0002\u0011\r|WO\u001c;feF*\u0012\u0001\u0011\t\u0003\u0003&k\u0011A\u0011\u0006\u0003\u0007\u0012\u000ba!\u0019;p[&\u001c'BA#G\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003[\u001dS\u0011\u0001S\u0001\u0005U\u00064\u0018-\u0003\u0002K\u0005\ni\u0011\t^8nS\u000eLe\u000e^3hKJ\f\u0011bY8v]R,'/\r\u0011\u0002\u0011\r|WO\u001c;feJ\n\u0011bY8v]R,'O\r\u0011\u0002\u001d\rDWmY6tk6\u0004\u0016M]1ngV\t\u0001\u000b\u0005\u0002R76\t!K\u0003\u0002T)\u0006A1\r[3dWN,XN\u0003\u0002V-\u000691\u000f^8sC\u001e,'BA\u000eX\u0015\tA\u0016,A\u0005d_:4G.^3oi*\t!,\u0001\u0002j_&\u0011AL\u0015\u0002\u000f\u0007\",7m[:v[B\u000b'/Y7t\u0003=\u0019\u0007.Z2lgVl\u0007+\u0019:b[N\u0004\u0013!B:fiV\u0004H#\u00011\u0011\u0005}\t\u0017B\u00012!\u0005\u0011)f.\u001b;)\u00051!\u0007CA3m\u001b\u00051'BA4i\u0003\r\t\u0007/\u001b\u0006\u0003S*\fqA[;qSR,'O\u0003\u0002lg\u0005)!.\u001e8ji&\u0011QN\u001a\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$wn\u001e8)\u00055\u0001\bCA3r\u0013\t\u0011hMA\u0005BMR,'/R1dQ\u0006\u0001C/Z:u\u001b>\u001c7nU2iK\u0012,H.\u001a:O_:\u0004VM]5pI&\u001cG+Y:lQ\tqQ\u000f\u0005\u0002fm&\u0011qO\u001a\u0002\u0005)\u0016\u001cH/A\u000fuKN$Xj\\2l'\u000eDW\rZ;mKJ\u0004VM]5pI&\u001cG+Y:lQ\tyQ/\u0001\u0011uKN$(+Z3oiJ\fg\u000e\u001e+bg.Le.T8dWN\u001b\u0007.\u001a3vY\u0016\u0014\bF\u0001\tv\u0003M!Xm\u001d;O_:\u0004VM]5pI&\u001cG+Y:lQ\t\tR/A\u0012uKN$hj\u001c8QKJLw\u000eZ5d)\u0006\u001c8n\u00165f]B+'/[8e\u0013NTVM]8)\u0005I)\u0018\u0001\u0005;fgR\u0004VM]5pI&\u001cG+Y:lQ\t\u0019R/A\u0006uKN$(+Z:uCJ$\bF\u0001\u000bv\u0003i!Xm\u001d;V]N\u001c\u0007.\u001a3vY\u0016\u0004&o\u001c3vG\u0016\u0014H+Y:lQ\t)R/\u0001\ruKN$Xj\\2l'\u000eDW\rZ;mKJdunY6j]\u001eDsAFA\t\u0003/\tI\u0002E\u0002f\u0003'I1!!\u0006g\u0005\u001d!\u0016.\\3pkR\fQA^1mk\u0016t\u0012a\u0004\u0015\u0003-U\u0004")
/* loaded from: input_file:kafka/utils/SchedulerTest.class */
public class SchedulerTest {
    private final KafkaScheduler scheduler = new KafkaScheduler(1);
    private final MockTime mockTime = new MockTime();
    private final AtomicInteger counter1 = new AtomicInteger(0);
    private final AtomicInteger counter2 = new AtomicInteger(0);
    private final ChecksumParams checksumParams = TestUtils$.MODULE$.createChecksumParams();

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    public ChecksumParams checksumParams() {
        return this.checksumParams;
    }

    @BeforeEach
    public void setup() {
        scheduler().startup();
    }

    @AfterEach
    public void teardown() {
        scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        mockTime().scheduler.scheduleOnce("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L);
        mockTime().scheduler.scheduleOnce("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L);
        Assertions.assertEquals(0, counter1().get(), "Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented prior to task running.");
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get(), "Counter1 should be incremented");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented");
        mockTime().sleep(100000L);
        Assertions.assertEquals(1, counter1().get(), "More sleeping should not result in more incrementing on counter1.");
        Assertions.assertEquals(1, counter2().get(), "Counter2 should now be incremented.");
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        mockTime().scheduler.schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, 1L);
        mockTime().scheduler.schedule("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L, 100L);
        Assertions.assertEquals(0, counter1().get(), "Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented prior to task running.");
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get(), "Counter1 should be incremented");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented");
        mockTime().sleep(100L);
        Assertions.assertEquals(101, counter1().get(), "Counter1 should be incremented 101 times");
        Assertions.assertEquals(1, counter2().get(), "Counter2 should not be incremented once");
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        mockTime().scheduler.scheduleOnce("test1", () -> {
            this.mockTime().scheduler.scheduleOnce("test2", () -> {
                this.counter2().getAndIncrement();
            }, 0L);
        }, 1L);
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter2().get());
    }

    @Test
    public void testNonPeriodicTask() {
        scheduler().scheduleOnce(RemoteLogReaderTest.TOPIC, () -> {
            this.counter1().getAndIncrement();
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testNonPeriodicTask$2(this);
                Thread.sleep(5L);
                Assertions.assertEquals(1, counter1().get(), "Should only run once");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testNonPeriodicTaskWhenPeriodIsZero() {
        scheduler().schedule(RemoteLogReaderTest.TOPIC, () -> {
            this.counter1().getAndIncrement();
        }, 0L, 0L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(this);
                Thread.sleep(5L);
                Assertions.assertEquals(1, counter1().get(), "Should only run once");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testPeriodicTask() {
        scheduler().schedule(RemoteLogReaderTest.TOPIC, () -> {
            this.counter1().getAndIncrement();
        }, 0L, 5L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testPeriodicTask$2(this);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testRestart() {
        mockTime().scheduler.scheduleOnce("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L);
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get());
        mockTime().scheduler.shutdown();
        mockTime().scheduler.startup();
        mockTime().scheduler.scheduleOnce("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L);
        mockTime().sleep(1L);
        Assertions.assertEquals(2, counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        File randomPartitionForTopicLogDir = TestUtils$.MODULE$.randomPartitionForTopicLogDir(org.apache.kafka.test.TestUtils.tempDirectory((Path) null, (String) null), "kafka");
        LogConfig logConfig = new LogConfig(new Properties());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Metrics metrics = new Metrics();
        int ProducerIdExpirationMs = Defaults$.MODULE$.ProducerIdExpirationMs();
        int ProducerIdExpirationCheckIntervalMs = Defaults$.MODULE$.ProducerIdExpirationCheckIntervalMs();
        MergedLog$ mergedLog$ = MergedLog$.MODULE$;
        TopicPartition parseTopicPartitionName = LocalLog$.MODULE$.parseTopicPartitionName(randomPartitionForTopicLogDir);
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
        LogSegments logSegments = new LogSegments(parseTopicPartitionName);
        Optional maybeCreateLeaderEpochCache = MergedLog$.MODULE$.maybeCreateLeaderEpochCache(randomPartitionForTopicLogDir, parseTopicPartitionName, logDirFailureChannel, logConfig.recordVersion(), "");
        ProducerStateManager producerStateManager = new ProducerStateManager(parseTopicPartitionName, randomPartitionForTopicLogDir, 300000, new ProducerStateManagerConfig(ProducerIdExpirationMs, false), mockTime(), Optional.empty(), checksumParams());
        KafkaScheduler scheduler = scheduler();
        MockTime mockTime = mockTime();
        ChecksumParams checksumParams = checksumParams();
        LogLoader$ logLoader$ = LogLoader$.MODULE$;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        LogLoader$ logLoader$2 = LogLoader$.MODULE$;
        LoadedLogOffsets load = new LogLoader(randomPartitionForTopicLogDir, parseTopicPartitionName, logConfig, scheduler, mockTime, logDirFailureChannel, true, logSegments, 0L, 0L, maybeCreateLeaderEpochCache, producerStateManager, concurrentHashMap, checksumParams, false).load();
        LocalLog localLog = new LocalLog(randomPartitionForTopicLogDir, logConfig, logSegments, load.recoveryPoint, load.nextOffsetMetadata, scheduler(), mockTime(), parseTopicPartitionName, logDirFailureChannel, brokerTopicStats, LogOffsetsListener.NO_OP_OFFSETS_LISTENER, checksumParams());
        FileTierPartitionState fileTierPartitionState = new FileTierPartitionState(randomPartitionForTopicLogDir, logDirFailureChannel, parseTopicPartitionName, false, mockTime().scheduler, false, true, mockTime(), TierPartitionStateCleanupConfig.EMPTY, false, -1);
        None$ none$ = None$.MODULE$;
        TierLogComponents EMPTY = TierLogComponents$.MODULE$.EMPTY();
        ChecksumParams checksumParams2 = checksumParams();
        MergedLog$ mergedLog$2 = MergedLog$.MODULE$;
        MergedLog mergedLog = new MergedLog(localLog, 0L, true, metrics, maybeCreateLeaderEpochCache, ProducerIdExpirationCheckIntervalMs, producerStateManager, none$, true, fileTierPartitionState, EMPTY, None$.MODULE$, checksumParams2);
        Assertions.assertTrue(scheduler().taskRunning(mergedLog.producerExpireCheck()));
        mergedLog.close();
        Assertions.assertFalse(scheduler().taskRunning(mergedLog.producerExpireCheck()));
    }

    @Timeout(15)
    @Test
    public void testMockSchedulerLocking() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        $colon.colon colonVar = new $colon.colon(new CountDownLatch(1), new $colon.colon(new CountDownLatch(1), Nil$.MODULE$));
        mockTime().scheduler.scheduleOnce("test1", () -> {
            scheduledTask$1((CountDownLatch) colonVar.head(), countDownLatch, countDownLatch2);
        }, 1L);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
                this.mockTime().sleep(1L);
            }, 0L, 1L, TimeUnit.MILLISECONDS);
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            mockTime().scheduler.scheduleOnce("test2", () -> {
                scheduledTask$1((CountDownLatch) colonVar.apply(1), countDownLatch, countDownLatch2);
            }, 1L);
            colonVar.foreach(countDownLatch3 -> {
                countDownLatch3.countDown();
                return BoxedUnit.UNIT;
            });
            Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Tasks did not complete");
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest schedulerTest) {
        Assertions.assertEquals(schedulerTest.counter1().get(), 1);
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(SchedulerTest schedulerTest) {
        Assertions.assertEquals(schedulerTest.counter1().get(), 1);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest schedulerTest) {
        Assertions.assertTrue(schedulerTest.counter1().get() >= 20, "Should count to 20");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void scheduledTask$1(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, CountDownLatch countDownLatch3) {
        countDownLatch2.countDown();
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS), "Timed out waiting for latch");
        countDownLatch3.countDown();
    }
}
