/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.AlterMirrorTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MirrorTopicControlManagerTest {
    LogContext logContext = new LogContext();
    SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());

    @Test
    public void testAddMirrorTopicRecord() {
        Map<String, Uuid> clusterLinks = Collections.singletonMap("link", Uuid.randomUuid());
        MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.empty(), linkName -> Optional.ofNullable(clusterLinks.get(linkName)));
        ArrayList mirrorTopicRecord = new ArrayList(1);
        Uuid topicId = Uuid.randomUuid();
        ApiError error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("topic").setLinkName("link").setMirrorTopic("foo-origin").setMirrorTopicId(Uuid.ZERO_UUID), topicId, mirrorTopicRecord::add);
        Assertions.assertEquals((Object)ApiError.NONE, (Object)error);
        Assertions.assertEquals((int)1, (int)mirrorTopicRecord.size());
        MirrorTopicRecord record = (MirrorTopicRecord)((ApiMessageAndVersion)mirrorTopicRecord.get(0)).message();
        Assertions.assertEquals((Object)record.topicId(), (Object)topicId);
        Assertions.assertEquals((Object)record.clusterLinkId(), (Object)clusterLinks.get("link"));
        Assertions.assertEquals((Object)record.mirrorTopicState(), (Object)MirrorTopic.State.MIRROR.stateName());
        Assertions.assertEquals((Object)record.sourceTopicId(), (Object)Uuid.ZERO_UUID);
        Assertions.assertEquals((Object)record.sourceTopicName(), (Object)"foo-origin");
    }

    @Test
    public void testAddRecordMissingLink() {
        MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.empty(), __ -> Optional.empty());
        ApiError error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("topic").setLinkName("link").setMirrorTopic("foo-origin").setMirrorTopicId(Uuid.ZERO_UUID), Uuid.randomUuid(), Collections.emptyList()::add);
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_NOT_FOUND, (Object)error.error());
    }

    @Test
    public void testAddRecordCreateTopic() {
        MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.empty(), __ -> Optional.empty());
        ArrayList mirrorTopicRecord = new ArrayList(1);
        ApiError error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("topic"), Uuid.randomUuid(), mirrorTopicRecord::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)0, (int)mirrorTopicRecord.size());
        error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("topic").setMirrorTopic("foo-origin"), Uuid.randomUuid(), mirrorTopicRecord::add);
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)error.error());
        Assertions.assertEquals((int)0, (int)mirrorTopicRecord.size());
        error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("topic").setLinkName("link"), Uuid.randomUuid(), mirrorTopicRecord::add);
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)error.error());
        Assertions.assertEquals((int)0, (int)mirrorTopicRecord.size());
    }

    @Test
    public void testAlterMirrors() {
        HashMap<String, Uuid> topicIds = new HashMap<String, Uuid>();
        HashMap<String, Uuid> linkIds = new HashMap<String, Uuid>();
        MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, topicId -> Optional.ofNullable(topicIds.get(topicId)), linkId -> Optional.ofNullable(linkIds.get(linkId)));
        Uuid topicId2 = Uuid.randomUuid();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(1);
        ApiError error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("foo").setLinkName("cluster-link").setMirrorTopic("foo-origin").setMirrorTopicId(Uuid.randomUuid()), topicId2, records::add);
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_NOT_FOUND, (Object)error.error());
        Assertions.assertEquals((int)0, (int)records.size());
        linkIds.put("cluster-link", Uuid.randomUuid());
        error = mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("foo").setLinkName("cluster-link").setMirrorTopic("foo-origin").setMirrorTopicId(Uuid.randomUuid()), topicId2, records::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)1, (int)records.size());
        RecordTestUtils.replayAll(mirrorTopicControl, records);
        ControllerResult result = mirrorTopicControl.alterMirrorTopics(MirrorTopicControlManagerTest.alterMirrorTopicRequest(alterMirrorTopic -> alterMirrorTopic.setTopic("foo").setMirrorTopicState(MirrorTopic.State.FAILED.stateName()), true));
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_ID.code(), (short)((AlterMirrorTopicsResponseData.AlterMirrorResult)((AlterMirrorTopicsResponseData)result.response()).alterMirrorResults().get(0)).errorCode());
        topicIds.put("foo", topicId2);
        result = mirrorTopicControl.alterMirrorTopics(MirrorTopicControlManagerTest.alterMirrorTopicRequest(alterMirrorTopic -> alterMirrorTopic.setTopic("foo").setMirrorTopicState(MirrorTopic.State.FAILED.stateName()), true));
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((AlterMirrorTopicsResponseData)result.response()).errorCode());
        Assertions.assertEquals((int)0, (int)result.records().size());
        result = mirrorTopicControl.alterMirrorTopics(MirrorTopicControlManagerTest.alterMirrorTopicRequest(alterMirrorTopic -> alterMirrorTopic.setTopic("foo").setMirrorTopicState(MirrorTopic.State.FAILED.stateName()), false));
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((AlterMirrorTopicsResponseData)result.response()).errorCode());
        Assertions.assertEquals((int)1, (int)result.records().size());
        RecordTestUtils.replayAll(mirrorTopicControl, result.records());
        Assertions.assertEquals((Object)MirrorTopic.State.FAILED, mirrorTopicControl.mirrorTopic(topicId2).map(MirrorTopic::mirrorState).orElse(null));
    }

    @Test
    public void testMirrorTopicStateMachine() {
        HashMap<MirrorTopic.State, HashSet<MirrorTopic.State>> allowed = new HashMap<MirrorTopic.State, HashSet<MirrorTopic.State>>();
        allowed.put(MirrorTopic.State.MIRROR, new HashSet<MirrorTopic.State>(Arrays.asList(MirrorTopic.State.MIRROR, MirrorTopic.State.PAUSED, MirrorTopic.State.FAILED, MirrorTopic.State.PENDING_STOPPED)));
        allowed.put(MirrorTopic.State.PAUSED, new HashSet<MirrorTopic.State>(Arrays.asList(MirrorTopic.State.PAUSED, MirrorTopic.State.MIRROR, MirrorTopic.State.FAILED, MirrorTopic.State.PENDING_STOPPED)));
        allowed.put(MirrorTopic.State.FAILED, new HashSet<MirrorTopic.State>(Arrays.asList(MirrorTopic.State.FAILED, MirrorTopic.State.PAUSED, MirrorTopic.State.PENDING_STOPPED)));
        allowed.put(MirrorTopic.State.PENDING_STOPPED, new HashSet<MirrorTopic.State>(Arrays.asList(MirrorTopic.State.PENDING_STOPPED, MirrorTopic.State.STOPPED)));
        allowed.put(MirrorTopic.State.STOPPED, new HashSet<MirrorTopic.State>(Collections.singletonList(MirrorTopic.State.STOPPED)));
        for (MirrorTopic.State initialState : MirrorTopic.State.values()) {
            for (MirrorTopic.State proposedState : MirrorTopic.State.values()) {
                boolean isAllowed = ((Set)allowed.get(initialState)).contains(proposedState);
                if (isAllowed) {
                    Assertions.assertEquals((Object)Errors.NONE, (Object)this.tryStateChange(initialState, proposedState), (String)("Expected no error for transition from " + initialState + " to " + proposedState));
                    continue;
                }
                Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)this.tryStateChange(initialState, proposedState), (String)("Expected an error for transition from " + initialState + " to " + proposedState));
            }
        }
    }

    Errors tryStateChange(MirrorTopic.State initialState, MirrorTopic.State state) {
        Uuid topicId = Uuid.randomUuid();
        MirrorTopic mirrorTopic = MirrorTopicControlManagerTest.mirrorTopic(topicId, initialState);
        MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.of(topicId), __ -> Optional.empty());
        mirrorTopicControl.replay(MirrorTopic.toSnapshotRecord((MirrorTopic)mirrorTopic, (String)"foo"));
        Assertions.assertTrue((boolean)mirrorTopicControl.isMirrorTopic(topicId));
        AlterMirrorTopicsRequestData.AlterMirrorTopic alterMirrorTopic = new AlterMirrorTopicsRequestData.AlterMirrorTopic();
        alterMirrorTopic.setTopic("foo").setMirrorTopicState(state.stateName());
        return mirrorTopicControl.alterMirrorState(alterMirrorTopic, __ -> {}).error();
    }

    static MirrorTopic mirrorTopic(Uuid topicId, MirrorTopic.State state) {
        return MirrorTopic.fromRecord((MirrorTopicRecord)new MirrorTopicRecord().setClusterLinkId(Uuid.randomUuid()).setClusterLinkName("foo-link").setTopicId(topicId).setTopicName("foo").setMirrorTopicState(state.stateName()).setSourceTopicName("foo-origin").setSourceTopicId(Uuid.randomUuid()).setPreviousToPausedState(MirrorTopic.State.MIRROR.stateName()));
    }

    static AlterMirrorTopicsRequestData alterMirrorTopicRequest(Consumer<AlterMirrorTopicsRequestData.AlterMirrorTopic> consumer, boolean validateOnly) {
        AlterMirrorTopicsRequestData request = new AlterMirrorTopicsRequestData();
        AlterMirrorTopicsRequestData.AlterMirrorTopic alterMirrorTopic = new AlterMirrorTopicsRequestData.AlterMirrorTopic();
        consumer.accept(alterMirrorTopic);
        request.setAlterMirrorTopics(Collections.singletonList(alterMirrorTopic));
        request.setValidateOnly(validateOnly);
        return request;
    }
}

