/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metalog;

import java.util.Arrays;
import java.util.List;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.metalog.MockMetaLogManagerListener;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=40L)
public class LocalLogManagerTest {
    @Test
    public void testCreateAndClose() throws Exception {
        try (LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).buildWithMockListeners();){
            env.close();
            Assertions.assertNull((Object)env.firstError.get());
        }
    }

    @Test
    public void testClaimsLeadership() throws Exception {
        try (LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).buildWithMockListeners();){
            Assertions.assertEquals((Object)new LeaderAndEpoch(OptionalInt.of(0), 1), (Object)env.waitForLeader());
            env.close();
            Assertions.assertNull((Object)env.firstError.get());
        }
    }

    @Test
    public void testPassLeadership() throws Exception {
        try (LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).buildWithMockListeners();){
            LeaderAndEpoch next;
            LeaderAndEpoch first;
            LeaderAndEpoch cur = first = env.waitForLeader();
            do {
                int currentLeaderId = cur.leaderId().orElseThrow(() -> new AssertionError((Object)"Current leader is undefined"));
                env.logManagers().get(currentLeaderId).resign(cur.epoch());
                next = env.waitForLeader();
                while (next.epoch() == cur.epoch()) {
                    Thread.sleep(1L);
                    next = env.waitForLeader();
                }
                long expectedNextEpoch = cur.epoch() + 2;
                Assertions.assertEquals((long)expectedNextEpoch, (long)next.epoch(), (String)("Expected next epoch to be " + expectedNextEpoch + ", but found  " + next));
            } while ((cur = next).leaderId().equals(first.leaderId()));
            env.close();
            Assertions.assertNull((Object)env.firstError.get());
        }
    }

    private static void waitForLastCommittedOffset(long targetOffset, LocalLogManager logManager) throws InterruptedException {
        TestUtils.retryOnExceptionWithTimeout((long)20000L, (long)3L, () -> {
            MockMetaLogManagerListener listener = (MockMetaLogManagerListener)logManager.listeners().get(0);
            long highestOffset = -1L;
            for (String event : listener.serializedEvents()) {
                if (!event.startsWith("LAST_COMMITTED_OFFSET")) continue;
                long offset = Long.parseLong(event.substring("LAST_COMMITTED_OFFSET".length() + 1));
                if (offset < highestOffset) {
                    throw new RuntimeException("Invalid offset: " + offset + " is less than the previous offset of " + highestOffset);
                }
                highestOffset = offset;
            }
            if (highestOffset < targetOffset) {
                throw new RuntimeException("Offset for log manager " + logManager.nodeId() + " only reached " + highestOffset);
            }
        });
    }

    @Test
    public void testCommits() throws Exception {
        try (LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).buildWithMockListeners();){
            LeaderAndEpoch leaderInfo = env.waitForLeader();
            int leaderId = leaderInfo.leaderId().orElseThrow(() -> new AssertionError((Object)"Current leader is undefined"));
            LocalLogManager activeLogManager = env.logManagers().get(leaderId);
            int epoch = activeLogManager.leaderAndEpoch().epoch();
            List<ApiMessageAndVersion> messages = Arrays.asList(new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2), 0));
            Assertions.assertEquals((long)3L, (long)activeLogManager.scheduleAppend(epoch, messages));
            for (LocalLogManager logManager : env.logManagers()) {
                LocalLogManagerTest.waitForLastCommittedOffset(3L, logManager);
            }
            List listeners = env.logManagers().stream().map(m -> (MockMetaLogManagerListener)m.listeners().get(0)).collect(Collectors.toList());
            env.close();
            for (MockMetaLogManagerListener listener : listeners) {
                List<String> events = listener.serializedEvents();
                Assertions.assertEquals((Object)"SHUTDOWN", (Object)events.get(events.size() - 1));
                int foundIndex = 0;
                for (String event : events) {
                    if (!event.startsWith("COMMIT")) continue;
                    Assertions.assertEquals((Object)messages.get(foundIndex).message().toString(), (Object)event.substring("COMMIT".length() + 1));
                    ++foundIndex;
                }
                Assertions.assertEquals((int)messages.size(), (int)foundIndex);
            }
        }
    }
}

