/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metalog;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MockMetaLogManagerListener;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class LocalLogManagerTest {
    private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);

    @Test
    public void testCreateAndClose() throws Exception {
        try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(1);){
            env.close();
            Assertions.assertEquals(null, (Object)env.firstError.get());
        }
    }

    @Test
    public void testClaimsLeadership() throws Exception {
        try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(1);){
            Assertions.assertEquals((Object)new MetaLogLeader(0, 0L), (Object)env.waitForLeader());
            env.close();
            Assertions.assertEquals(null, (Object)env.firstError.get());
        }
    }

    @Test
    public void testPassLeadership() throws Exception {
        try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(3);){
            MetaLogLeader next;
            MetaLogLeader first;
            MetaLogLeader cur = first = env.waitForLeader();
            do {
                env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
                next = env.waitForLeader();
                while (next.epoch() == cur.epoch()) {
                    Thread.sleep(1L);
                    next = env.waitForLeader();
                }
                long expectedNextEpoch = cur.epoch() + 2L;
                Assertions.assertEquals((long)expectedNextEpoch, (long)next.epoch(), (String)("Expected next epoch to be " + expectedNextEpoch + ", but found  " + next));
            } while ((cur = next).nodeId() == first.nodeId());
            env.close();
            Assertions.assertEquals(null, (Object)env.firstError.get());
        }
    }

    private static void waitForLastCommittedOffset(long targetOffset, LocalLogManager logManager) throws InterruptedException {
        TestUtils.retryOnExceptionWithTimeout((long)20000L, (long)3L, () -> {
            MockMetaLogManagerListener listener = (MockMetaLogManagerListener)logManager.listeners().get(0);
            long highestOffset = -1L;
            for (String event : listener.serializedEvents()) {
                if (!event.startsWith("LAST_COMMITTED_OFFSET")) continue;
                long offset = Long.valueOf(event.substring("LAST_COMMITTED_OFFSET".length() + 1));
                if (offset < highestOffset) {
                    throw new RuntimeException("Invalid offset: " + offset + " is less than the previous offset of " + highestOffset);
                }
                highestOffset = offset;
            }
            if (highestOffset < targetOffset) {
                throw new RuntimeException("Offset for log manager " + logManager.nodeId() + " only reached " + highestOffset);
            }
        });
    }

    @Test
    public void testCommits() throws Exception {
        try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(3);){
            MetaLogLeader leaderInfo = env.waitForLeader();
            LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
            long epoch = activeLogManager.leader().epoch();
            List<ApiMessageAndVersion> messages = Arrays.asList(new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2), 0));
            Assertions.assertEquals((long)3L, (long)activeLogManager.scheduleWrite(epoch, messages));
            for (LocalLogManager logManager : env.logManagers()) {
                LocalLogManagerTest.waitForLastCommittedOffset(3L, logManager);
            }
            List listeners = env.logManagers().stream().map(m -> (MockMetaLogManagerListener)m.listeners().get(0)).collect(Collectors.toList());
            env.close();
            for (MockMetaLogManagerListener listener : listeners) {
                List<String> events = listener.serializedEvents();
                Assertions.assertEquals((Object)"SHUTDOWN", (Object)events.get(events.size() - 1));
                int foundIndex = 0;
                for (String event : events) {
                    if (!event.startsWith("COMMIT")) continue;
                    Assertions.assertEquals((Object)messages.get(foundIndex).message().toString(), (Object)event.substring("COMMIT".length() + 1));
                    ++foundIndex;
                }
                Assertions.assertEquals((int)messages.size(), (int)foundIndex);
            }
        }
    }
}

