package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.entities.AlterMirrors;
import io.confluent.kafkarest.entities.Broker;
import io.confluent.kafkarest.entities.Cluster;
import io.confluent.kafkarest.entities.Mirror;
import io.confluent.kafkarest.entities.MirrorStatus;
import io.confluent.kafkarest.entities.TopicPartitionLag;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.AlterMirrorsResult;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsResult;
import org.apache.kafka.clients.admin.ListMirrorsOptions;
import org.apache.kafka.clients.admin.ListMirrorsResult;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.admin.ReplicaStatusResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.easymock.EasyMock;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/confluent/kafkarest/controllers/MirrorManagerImplTest.class */
public final class MirrorManagerImplTest {
    private static final String PENDING_STOPPED_STATUS_STRING = "pending_stopped";

    @Rule
    public final EasyMockRule mocks = new EasyMockRule(this);

    @Mock
    private ClusterManager clusterManager;

    @Mock
    private ConfluentAdmin adminClient;

    @Mock
    private ListMirrorsResult listMirrorsResult;

    @Mock
    private DescribeMirrorsResult describeMirrorsResult;

    @Mock
    private ReplicaStatusResult replicaStatusResult;

    @Mock
    private AlterMirrorsResult alterMirrorsResult;

    @Mock
    private CreateTopicsResult createTopicsResult;
    private MirrorManagerImpl mirrorManager;
    private static final Node NODE_1 = new Node(1, "broker-1", 9091);
    private static final int NUM_PARTITIONS = 2;
    private static final Node NODE_2 = new Node(NUM_PARTITIONS, "broker-2", 9092);
    private static final Node NODE_3 = new Node(3, "broker-3", 9093);
    private static final String CLUSTER_ID = "cluster-1";
    private static final Broker BROKER_1 = Broker.fromNode(CLUSTER_ID, NODE_1);
    private static final Broker BROKER_2 = Broker.fromNode(CLUSTER_ID, NODE_2);
    private static final Broker BROKER_3 = Broker.fromNode(CLUSTER_ID, NODE_3);
    private static final Cluster CLUSTER = Cluster.create(CLUSTER_ID, BROKER_1, Arrays.asList(BROKER_1, BROKER_2, BROKER_3));
    private static final String MIRROR_TOPIC_1 = "Mirror-1";
    private static final String MIRROR_TOPIC_2 = "Mirror-2";
    private static final Collection<String> MIRROR_LISTINGS = Arrays.asList(MIRROR_TOPIC_1, MIRROR_TOPIC_2);
    private static final Long HIGH_WATERMARK = 88888888L;
    private static final Long LEO_MIRROR_TP_10 = 88888888L;
    private static final Long LEO_MIRROR_TP_11 = 88888888L;
    private static final Long LEO_MIRROR_TP_20 = 22222222L;
    private static final Long LEO_MIRROR_TP_21 = 44444444L;
    private static final ReplicaStatus.MirrorInfo MIRROR_INFO = new ReplicaStatus.MirrorInfo(ReplicaStatus.MirrorInfo.State.UNKNOWN, 0, HIGH_WATERMARK.longValue());
    private static final Map<TopicPartition, KafkaFuture<List<ReplicaStatus>>> REPLICA_STATUS_MAP = new HashMap<TopicPartition, KafkaFuture<List<ReplicaStatus>>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.1
        {
            put(new TopicPartition(MirrorManagerImplTest.MIRROR_TOPIC_1, 0), KafkaFuture.completedFuture(Collections.singletonList(new ReplicaStatus(0, true, false, true, true, true, 0L, MirrorManagerImplTest.LEO_MIRROR_TP_10.longValue(), 0L, 0L, Optional.empty(), Optional.of(MirrorManagerImplTest.MIRROR_INFO)))));
            put(new TopicPartition(MirrorManagerImplTest.MIRROR_TOPIC_1, 1), KafkaFuture.completedFuture(Collections.singletonList(new ReplicaStatus(0, true, false, true, true, true, 0L, MirrorManagerImplTest.LEO_MIRROR_TP_11.longValue(), 0L, 0L, Optional.empty(), Optional.of(MirrorManagerImplTest.MIRROR_INFO)))));
            put(new TopicPartition(MirrorManagerImplTest.MIRROR_TOPIC_2, 0), KafkaFuture.completedFuture(Collections.singletonList(new ReplicaStatus(0, true, false, true, true, true, 0L, MirrorManagerImplTest.LEO_MIRROR_TP_20.longValue(), 0L, 0L, Optional.empty(), Optional.of(MirrorManagerImplTest.MIRROR_INFO)))));
            put(new TopicPartition(MirrorManagerImplTest.MIRROR_TOPIC_2, 1), KafkaFuture.completedFuture(Collections.singletonList(new ReplicaStatus(0, true, false, true, true, true, 0L, MirrorManagerImplTest.LEO_MIRROR_TP_21.longValue(), 0L, 0L, Optional.empty(), Optional.of(MirrorManagerImplTest.MIRROR_INFO)))));
        }
    };
    private static final MirrorTopicDescription.State PENDING_STOPPED_STATUS = MirrorTopicDescription.State.PENDING_STOPPED;
    private static final List<TopicPartitionLag> MIRROR_LAGS_TOPIC_1 = (List) new HashSet(Arrays.asList(TopicPartitionLag.create(new TopicPartition(MIRROR_TOPIC_1, 0), HIGH_WATERMARK.longValue() - LEO_MIRROR_TP_10.longValue(), HIGH_WATERMARK.longValue()), TopicPartitionLag.create(new TopicPartition(MIRROR_TOPIC_1, 1), HIGH_WATERMARK.longValue() - LEO_MIRROR_TP_11.longValue(), HIGH_WATERMARK.longValue()))).stream().collect(Collectors.toList());
    private static final List<TopicPartitionLag> MIRROR_LAGS_TOPIC_2 = (List) new HashSet(Arrays.asList(TopicPartitionLag.create(new TopicPartition(MIRROR_TOPIC_2, 0), HIGH_WATERMARK.longValue() - LEO_MIRROR_TP_20.longValue(), HIGH_WATERMARK.longValue()), TopicPartitionLag.create(new TopicPartition(MIRROR_TOPIC_2, 1), HIGH_WATERMARK.longValue() - LEO_MIRROR_TP_21.longValue(), HIGH_WATERMARK.longValue()))).stream().collect(Collectors.toList());
    private static final String LINK_1 = "link-1";
    private static final long STATE_TIME_MS = 142857;
    private static final Mirror RETURNED_MIRROR_1 = Mirror.create(LINK_1, MIRROR_TOPIC_1, MIRROR_TOPIC_1, NUM_PARTITIONS, MIRROR_LAGS_TOPIC_1, MirrorTopicDescription.State.PENDING_STOPPED, STATE_TIME_MS, Collections.emptyList());
    private static final Mirror RETURNED_MIRROR_2 = Mirror.create(LINK_1, MIRROR_TOPIC_2, MIRROR_TOPIC_2, NUM_PARTITIONS, MIRROR_LAGS_TOPIC_2, MirrorTopicDescription.State.PENDING_STOPPED, STATE_TIME_MS, Collections.emptyList());
    private static final Short REPLICATION_FACTOR = 3;
    private static final NewTopic NEW_MIRROR_TOPIC_1 = new NewTopic(MIRROR_TOPIC_1, -1, REPLICATION_FACTOR.shortValue());
    private static final NewMirrorTopic NEW_TOPIC_MIRROR_1 = new NewMirrorTopic(LINK_1, MIRROR_TOPIC_1);
    private static final MirrorTopicDescription RETURNED_MIRROR_DESCRIPTION_1 = new MirrorTopicDescription(LINK_1, UUID.randomUUID(), MIRROR_TOPIC_1, NUM_PARTITIONS, PENDING_STOPPED_STATUS, STATE_TIME_MS, Collections.emptyList());
    private static final MirrorTopicDescription RETURNED_MIRROR_DESCRIPTION_2 = new MirrorTopicDescription(LINK_1, UUID.randomUUID(), MIRROR_TOPIC_2, NUM_PARTITIONS, PENDING_STOPPED_STATUS, STATE_TIME_MS, Collections.emptyList());
    private static final Map<String, KafkaFuture<MirrorTopicDescription>> LIST_MIRROR_DESCRIPTION_MAP = new HashMap<String, KafkaFuture<MirrorTopicDescription>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.2
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, KafkaFuture.completedFuture(MirrorManagerImplTest.RETURNED_MIRROR_DESCRIPTION_1));
            put(MirrorManagerImplTest.MIRROR_TOPIC_2, KafkaFuture.completedFuture(MirrorManagerImplTest.RETURNED_MIRROR_DESCRIPTION_2));
        }
    };
    private static final Map<String, KafkaFuture<MirrorTopicDescription>> TOPIC_1_MIRROR_DESCRIPTION = new HashMap<String, KafkaFuture<MirrorTopicDescription>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.3
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, KafkaFuture.completedFuture(MirrorManagerImplTest.RETURNED_MIRROR_DESCRIPTION_1));
        }
    };
    private static final Map<String, KafkaFuture<MirrorTopicDescription>> ALTER_MIRROR_TOPIC_DESCRIPTION = new HashMap<String, KafkaFuture<MirrorTopicDescription>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.4
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, KafkaFuture.completedFuture(MirrorManagerImplTest.RETURNED_MIRROR_DESCRIPTION_1));
            put(MirrorManagerImplTest.MIRROR_TOPIC_2, KafkaFuture.completedFuture(MirrorManagerImplTest.RETURNED_MIRROR_DESCRIPTION_2));
        }
    };
    private static final Map<String, AlterMirrorOp> PROMOTE_MIRROR_OPS = new HashMap<String, AlterMirrorOp>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.5
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, AlterMirrorOp.PROMOTE);
        }
    };
    private static final Map<String, AlterMirrorOp> FAILOVER_MIRROR_OPS = new HashMap<String, AlterMirrorOp>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.6
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, AlterMirrorOp.FAILOVER);
            put(MirrorManagerImplTest.MIRROR_TOPIC_2, AlterMirrorOp.FAILOVER);
        }
    };
    private static final Map<String, KafkaFuture<Void>> PROMOTE_MIRROR_RESULT_VALUES = new HashMap<String, KafkaFuture<Void>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.7
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, KafkaFuture.completedFuture((Object) null));
        }
    };
    private static final Map<String, KafkaFuture<Void>> FAILOVER_MIRROR_RESULT_VALUES = new HashMap<String, KafkaFuture<Void>>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.8
        {
            put(MirrorManagerImplTest.MIRROR_TOPIC_1, KafkaFuture.completedFuture((Object) null));
            put(MirrorManagerImplTest.MIRROR_TOPIC_2, KafkaFuture.completedFuture((Object) null));
        }
    };
    private static final AlterMirrors PROMOTE_MIRROR_TOPIC_1 = AlterMirrors.create(MIRROR_TOPIC_1, (Integer) null, (String) null, MIRROR_LAGS_TOPIC_1);
    private static final AlterMirrors PROMOTE_MIRROR_TOPIC_2 = AlterMirrors.create(MIRROR_TOPIC_2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), "Operation failed because some partitions have mirror lag greater than 0", MIRROR_LAGS_TOPIC_2);
    private static final AlterMirrors FAILOVER_MIRROR_TOPIC_1 = AlterMirrors.create(MIRROR_TOPIC_1, (Integer) null, (String) null, MIRROR_LAGS_TOPIC_1);
    private static final AlterMirrors FAILOVER_MIRROR_TOPIC_2 = AlterMirrors.create(MIRROR_TOPIC_2, (Integer) null, (String) null, MIRROR_LAGS_TOPIC_2);
    private static final List<Mirror> RETURNED_MIRRORS = Arrays.asList(RETURNED_MIRROR_1, RETURNED_MIRROR_2);

    @Before
    public void setUp() {
        this.mirrorManager = new MirrorManagerImpl(this.adminClient, this.clusterManager);
    }

    @Test
    public void testListMirrors() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.adminClient.listMirrors((ListMirrorsOptions) EasyMock.anyObject())).andReturn(this.listMirrorsResult);
        EasyMock.expect(this.adminClient.describeMirrors((Collection) EasyMock.anyObject(), (DescribeMirrorsOptions) EasyMock.anyObject())).andReturn(this.describeMirrorsResult);
        EasyMock.expect(this.adminClient.replicaStatus((Set) EasyMock.anyObject(), (ReplicaStatusOptions) EasyMock.anyObject())).andReturn(this.replicaStatusResult);
        EasyMock.expect(this.listMirrorsResult.result()).andReturn(KafkaFuture.completedFuture(MIRROR_LISTINGS));
        EasyMock.expect(this.describeMirrorsResult.result()).andReturn(LIST_MIRROR_DESCRIPTION_MAP).anyTimes();
        EasyMock.expect(this.replicaStatusResult.all()).andReturn(KafkaFuture.completedFuture((Object) null));
        EasyMock.expect(this.replicaStatusResult.result()).andReturn(REPLICA_STATUS_MAP).anyTimes();
        EasyMock.replay(new Object[]{this.clusterManager, this.adminClient, this.listMirrorsResult, this.describeMirrorsResult, this.replicaStatusResult});
        Assert.assertEquals(RETURNED_MIRRORS, (List) this.mirrorManager.listMirrors(CLUSTER_ID, MirrorStatus.fromString(PENDING_STOPPED_STATUS_STRING)).get());
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void testGetMirrors() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.adminClient.describeMirrors((Collection) EasyMock.anyObject(), (DescribeMirrorsOptions) EasyMock.anyObject())).andReturn(this.describeMirrorsResult);
        EasyMock.expect(this.adminClient.replicaStatus((Set) EasyMock.anyObject(), (ReplicaStatusOptions) EasyMock.anyObject())).andReturn(this.replicaStatusResult);
        EasyMock.expect(this.describeMirrorsResult.result()).andReturn(TOPIC_1_MIRROR_DESCRIPTION).anyTimes();
        EasyMock.expect(this.replicaStatusResult.all()).andReturn(KafkaFuture.completedFuture((Object) null));
        EasyMock.expect(this.replicaStatusResult.result()).andReturn(REPLICA_STATUS_MAP).anyTimes();
        EasyMock.replay(new Object[]{this.clusterManager, this.adminClient, this.listMirrorsResult, this.describeMirrorsResult, this.replicaStatusResult});
        Optional optional = (Optional) this.mirrorManager.getMirror(CLUSTER_ID, LINK_1, MIRROR_TOPIC_1).get();
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(RETURNED_MIRROR_1, optional.get());
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void testCreateMirror() {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.adminClient.createTopics((Collection) EasyMock.eq(Collections.singletonList(NEW_MIRROR_TOPIC_1.configs(Collections.emptyMap()).mirror(Optional.of(NEW_TOPIC_MIRROR_1)))))).andReturn((Object) null);
        EasyMock.expect(this.createTopicsResult.all()).andReturn(KafkaFuture.completedFuture((Object) null));
        EasyMock.replay(new Object[]{this.clusterManager, this.adminClient, this.createTopicsResult});
        this.mirrorManager.createMirror(CLUSTER_ID, LINK_1, MIRROR_TOPIC_1, Collections.emptyMap(), REPLICATION_FACTOR);
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void testPromoteMirror() throws ExecutionException, InterruptedException {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.adminClient.alterMirrors((Map) EasyMock.eq(PROMOTE_MIRROR_OPS), (AlterMirrorsOptions) EasyMock.anyObject())).andReturn(this.alterMirrorsResult);
        EasyMock.expect(this.alterMirrorsResult.values()).andReturn(PROMOTE_MIRROR_RESULT_VALUES).anyTimes();
        EasyMock.expect(this.adminClient.describeMirrors((Collection) EasyMock.anyObject(), (DescribeMirrorsOptions) EasyMock.anyObject())).andReturn(this.describeMirrorsResult);
        EasyMock.expect(this.adminClient.replicaStatus((Set) EasyMock.anyObject(), (ReplicaStatusOptions) EasyMock.anyObject())).andReturn(this.replicaStatusResult);
        EasyMock.expect(this.describeMirrorsResult.result()).andReturn(ALTER_MIRROR_TOPIC_DESCRIPTION).anyTimes();
        EasyMock.expect(this.replicaStatusResult.all()).andReturn(KafkaFuture.completedFuture((Object) null));
        EasyMock.expect(this.replicaStatusResult.result()).andReturn(REPLICA_STATUS_MAP).anyTimes();
        EasyMock.replay(new Object[]{this.clusterManager, this.adminClient, this.alterMirrorsResult, this.describeMirrorsResult, this.replicaStatusResult});
        Map map = (Map) ((List) this.mirrorManager.promoteMirrors(CLUSTER_ID, LINK_1, new HashSet<String>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.9
            {
                add(MirrorManagerImplTest.MIRROR_TOPIC_1);
                add(MirrorManagerImplTest.MIRROR_TOPIC_2);
            }
        }, false).get()).stream().collect(Collectors.toMap((v0) -> {
            return v0.getMirrorTopicName();
        }, Function.identity()));
        Assert.assertEquals(PROMOTE_MIRROR_TOPIC_1, map.get(MIRROR_TOPIC_1));
        Assert.assertEquals(PROMOTE_MIRROR_TOPIC_2, map.get(MIRROR_TOPIC_2));
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void testFailoverMirror() throws ExecutionException, InterruptedException {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.adminClient.alterMirrors((Map) EasyMock.eq(FAILOVER_MIRROR_OPS), (AlterMirrorsOptions) EasyMock.anyObject())).andReturn(this.alterMirrorsResult);
        EasyMock.expect(this.alterMirrorsResult.values()).andReturn(FAILOVER_MIRROR_RESULT_VALUES).anyTimes();
        EasyMock.expect(this.adminClient.describeMirrors((Collection) EasyMock.anyObject(), (DescribeMirrorsOptions) EasyMock.anyObject())).andReturn(this.describeMirrorsResult);
        EasyMock.expect(this.adminClient.replicaStatus((Set) EasyMock.anyObject(), (ReplicaStatusOptions) EasyMock.anyObject())).andReturn(this.replicaStatusResult);
        EasyMock.expect(this.describeMirrorsResult.result()).andReturn(ALTER_MIRROR_TOPIC_DESCRIPTION).anyTimes();
        EasyMock.expect(this.replicaStatusResult.all()).andReturn(KafkaFuture.completedFuture((Object) null));
        EasyMock.expect(this.replicaStatusResult.result()).andReturn(REPLICA_STATUS_MAP).anyTimes();
        EasyMock.replay(new Object[]{this.clusterManager, this.adminClient, this.alterMirrorsResult, this.describeMirrorsResult, this.replicaStatusResult});
        Map map = (Map) ((List) this.mirrorManager.failOverMirrors(CLUSTER_ID, LINK_1, new HashSet<String>() { // from class: io.confluent.kafkarest.controllers.MirrorManagerImplTest.10
            {
                add(MirrorManagerImplTest.MIRROR_TOPIC_1);
                add(MirrorManagerImplTest.MIRROR_TOPIC_2);
            }
        }, false).get()).stream().collect(Collectors.toMap((v0) -> {
            return v0.getMirrorTopicName();
        }, Function.identity()));
        Assert.assertEquals(FAILOVER_MIRROR_TOPIC_1, map.get(MIRROR_TOPIC_1));
        Assert.assertEquals(FAILOVER_MIRROR_TOPIC_2, map.get(MIRROR_TOPIC_2));
        EasyMock.verify(new Object[]{this.adminClient});
    }
}
