package org.apache.kylin.stream.coordinator;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.curator.test.TestingServer;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.source.Partition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/coordinator/CoordinatorTest.class */
public class CoordinatorTest extends LocalFileMetadataTestCase {
    private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
    private TestingServer testingServer;
    private Coordinator coordinator;
    private CubeInstance cube;
    private StreamMetadataStore metadataStore;
    ReplicaSet rs1 = new ReplicaSet();
    ReplicaSet rs2 = new ReplicaSet();
    ReplicaSet rs3 = new ReplicaSet();
    ReplicaSet rs4 = new ReplicaSet();
    Node n1 = new Node("Node-1", 1000);
    Node n2 = new Node("Node-2", 1000);
    Node n3 = new Node("Node-3", 1000);
    Node n4 = new Node("Node-4", 1000);
    Node n5 = new Node("Node-5", 1000);
    Node n6 = new Node("Node-6", 1000);
    Node n7 = new Node("Node-7", 1000);
    Node n8 = new Node("Node-8", 1000);
    String cubeName = "MockRealtimeCube";
    String positionRs1 = "{\"1\":10001,\"2\":10002}";
    String positionRs2 = "{\"3\":10003,\"4\":10004}";
    String positionRs3 = "{\"5\":10005,\"6\":10006}";
    Partition p1 = new Partition(1);
    Partition p2 = new Partition(2);
    Partition p3 = new Partition(3);
    Partition p4 = new Partition(4);
    Partition p5 = new Partition(5);
    Partition p6 = new Partition(6);

    @Before
    public void setUp() throws Exception {
        logger.info("Setup coordinator test env.");
        staticCreateTestMetadata(new String[0]);
        this.testingServer = new TestingServer(12181, false);
        this.testingServer.start();
        System.setProperty("kylin.env.zookeeper-connect-string", "localhost:12181");
        this.metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore();
        initZookeeperMetadataStore();
        mockCube();
    }

    @After
    public void tearDown() throws Exception {
        this.coordinator = null;
        ZKUtil.cleanZkPath("/stream");
        StreamingUtils.getZookeeperClient().close();
        this.testingServer.stop();
        System.clearProperty("kylin.env.zookeeper-connect-string");
    }

    private void initZookeeperMetadataStore() {
        this.metadataStore.addReceiver(this.n1);
        this.metadataStore.addReceiver(this.n2);
        this.metadataStore.addReceiver(this.n3);
        this.metadataStore.addReceiver(this.n4);
        this.metadataStore.addReceiver(this.n5);
        this.metadataStore.addReceiver(this.n6);
        this.metadataStore.addReceiver(this.n7);
        this.metadataStore.addReceiver(this.n8);
        this.rs1.addNode(this.n1);
        this.rs1.addNode(this.n2);
        this.rs2.addNode(this.n3);
        this.rs2.addNode(this.n4);
        this.rs3.addNode(this.n5);
        this.rs3.addNode(this.n6);
        this.rs4.addNode(this.n7);
        this.rs4.addNode(this.n8);
        this.metadataStore.createReplicaSet(this.rs1);
        this.metadataStore.createReplicaSet(this.rs2);
        this.metadataStore.createReplicaSet(this.rs3);
        this.metadataStore.createReplicaSet(this.rs4);
        HashMap hashMap = new HashMap();
        hashMap.put(0, Lists.newArrayList(new Partition[]{this.p1, this.p2}));
        hashMap.put(1, Lists.newArrayList(new Partition[]{this.p3, this.p4}));
        hashMap.put(2, Lists.newArrayList(new Partition[]{this.p5, this.p6}));
        this.metadataStore.saveNewCubeAssignment(new CubeAssignment(this.cubeName, hashMap));
    }

    private ReceiverAdminClient mockSuccessReceiverAdminClient() throws IOException {
        ReceiverAdminClient receiverAdminClient = (ReceiverAdminClient) Mockito.mock(ReceiverAdminClient.class);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        consumerStatsResponse.setConsumePosition(this.positionRs1);
        consumerStatsResponse.setCubeName(this.cubeName);
        Mockito.when(receiverAdminClient.pauseConsumers((Node) Mockito.any(Node.class), (PauseConsumersRequest) Mockito.any(PauseConsumersRequest.class))).thenReturn(consumerStatsResponse);
        return receiverAdminClient;
    }

    private ReceiverAdminClient mockReceiverClientFailOnStopAndSync() throws IOException {
        ReceiverAdminClient receiverAdminClient = (ReceiverAdminClient) Mockito.mock(ReceiverAdminClient.class);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        consumerStatsResponse.setConsumePosition(this.positionRs1);
        consumerStatsResponse.setCubeName(this.cubeName);
        Mockito.when(receiverAdminClient.pauseConsumers((Node) Mockito.eq(this.n4), (PauseConsumersRequest) Mockito.any(PauseConsumersRequest.class))).thenThrow(new Throwable[]{new IOException("Mock Receiver Error")});
        Mockito.when(receiverAdminClient.pauseConsumers((Node) AdditionalMatchers.not(Matchers.eq(this.n4)), (PauseConsumersRequest) Mockito.any(PauseConsumersRequest.class))).thenReturn(consumerStatsResponse);
        ((ReceiverAdminClient) Mockito.doThrow(new IOException()).when(receiverAdminClient)).startConsumers((Node) Mockito.eq(this.n1), (StartConsumersRequest) Mockito.any(StartConsumersRequest.class));
        return receiverAdminClient;
    }

    private ReceiverAdminClient mockReceiverClientFailOnStartNewComsumer() throws IOException {
        ReceiverAdminClient receiverAdminClient = (ReceiverAdminClient) Mockito.mock(ReceiverAdminClient.class);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        consumerStatsResponse.setConsumePosition(this.positionRs1);
        consumerStatsResponse.setCubeName(this.cubeName);
        Mockito.when(receiverAdminClient.pauseConsumers((Node) Mockito.any(Node.class), (PauseConsumersRequest) Mockito.any(PauseConsumersRequest.class))).thenReturn(consumerStatsResponse);
        ((ReceiverAdminClient) Mockito.doThrow(new IOException()).when(receiverAdminClient)).startConsumers((Node) Mockito.eq(this.n7), (StartConsumersRequest) Mockito.any(StartConsumersRequest.class));
        ((ReceiverAdminClient) Mockito.doThrow(new IOException()).when(receiverAdminClient)).stopConsumers((Node) Mockito.eq(this.n5), (StopConsumersRequest) Mockito.any(StopConsumersRequest.class));
        return receiverAdminClient;
    }

    private void mockCube() {
        this.cube = (CubeInstance) Mockito.mock(CubeInstance.class);
        Mockito.when(Integer.valueOf(this.cube.getSourceType())).thenReturn(20);
        Mockito.when(this.cube.getName()).thenReturn(this.cubeName);
    }

    @Test
    public void testReassignWithoutExeception() throws IOException {
        this.coordinator = new Coordinator(this.metadataStore, mockSuccessReceiverAdminClient());
        Map assignments = this.metadataStore.getAssignmentsByCube(this.cubeName).getAssignments();
        HashMap hashMap = new HashMap();
        hashMap.put(1, Lists.newArrayList(new Partition[]{this.p1, this.p2, this.p3}));
        hashMap.put(2, Lists.newArrayList(new Partition[]{this.p4, this.p5}));
        hashMap.put(3, Lists.newArrayList(new Partition[]{this.p6}));
        this.coordinator.doReassign(this.cube, new CubeAssignment(this.cube.getName(), assignments), new CubeAssignment(this.cube.getName(), hashMap));
    }

    @Test(expected = ClusterStateException.class)
    public void testReassignFailOnStopAndSync() throws IOException {
        this.coordinator = new Coordinator(this.metadataStore, mockReceiverClientFailOnStopAndSync());
        Map assignments = this.metadataStore.getAssignmentsByCube(this.cubeName).getAssignments();
        HashMap hashMap = new HashMap();
        hashMap.put(1, Lists.newArrayList(new Partition[]{this.p1, this.p2, this.p3}));
        hashMap.put(2, Lists.newArrayList(new Partition[]{this.p4, this.p5}));
        hashMap.put(3, Lists.newArrayList(new Partition[]{this.p6}));
        try {
            this.coordinator.doReassign(this.cube, new CubeAssignment(this.cube.getName(), assignments), new CubeAssignment(this.cube.getName(), hashMap));
        } catch (ClusterStateException e) {
            Assert.assertSame(ClusterStateException.ClusterState.ROLLBACK_FAILED, e.getClusterState());
            Assert.assertSame(ClusterStateException.TransactionStep.STOP_AND_SNYC, e.getTransactionStep());
            System.out.println(e.getMessage());
            throw e;
        }
    }

    @Test(expected = ClusterStateException.class)
    public void testReassignFailOnStartNew() throws IOException {
        this.coordinator = new Coordinator(this.metadataStore, mockReceiverClientFailOnStartNewComsumer());
        Map assignments = this.metadataStore.getAssignmentsByCube(this.cubeName).getAssignments();
        HashMap hashMap = new HashMap();
        hashMap.put(1, Lists.newArrayList(new Partition[]{this.p1, this.p2, this.p3}));
        hashMap.put(2, Lists.newArrayList(new Partition[]{this.p4, this.p5}));
        hashMap.put(3, Lists.newArrayList(new Partition[]{this.p6}));
        try {
            this.coordinator.doReassign(this.cube, new CubeAssignment(this.cube.getName(), assignments), new CubeAssignment(this.cube.getName(), hashMap));
        } catch (ClusterStateException e) {
            Assert.assertSame(ClusterStateException.ClusterState.ROLLBACK_FAILED, e.getClusterState());
            Assert.assertSame(ClusterStateException.TransactionStep.START_NEW, e.getTransactionStep());
            System.out.println(e.getMessage());
            throw e;
        }
    }
}
