package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import com.mongodb.connection.ServerType;
import com.mongodb.event.ClusterDescriptionChangedEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoServerSelectorTest.class */
public class PipelinedMongoServerSelectorTest {
    public static final Logger LOG = LoggerFactory.getLogger(PipelinedMongoServerSelector.class);
    private static final ClusterId TEST_CLUSTER_ID = new ClusterId("test-mongo-cluster");
    private final AtomicInteger serverId = new AtomicInteger(0);
    private final List<ServerDescription> allReplicas = List.of(createServerDescription(ServerType.REPLICA_SET_PRIMARY, ServerConnectionState.CONNECTED), createServerDescription(ServerType.REPLICA_SET_SECONDARY, ServerConnectionState.CONNECTED), createServerDescription(ServerType.REPLICA_SET_SECONDARY, ServerConnectionState.CONNECTED));
    private final List<ServerDescription> oneSecondaryUp = List.of(createServerDescription(ServerType.REPLICA_SET_PRIMARY, ServerConnectionState.CONNECTED), createServerDescription(ServerType.REPLICA_SET_SECONDARY, ServerConnectionState.CONNECTED));
    private final List<ServerDescription> noSecondaryUp = List.of(createServerDescription(ServerType.REPLICA_SET_PRIMARY, ServerConnectionState.CONNECTED));
    private ExecutorService threadAscending;
    private ExecutorService threadDescending;
    private ExecutorService threadOther;

    @Before
    public void setUp() {
        this.threadAscending = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("mongo-dump-ascending").setDaemon(true).build());
        this.threadDescending = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("mongo-dump-descending").setDaemon(true).build());
        this.threadOther = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("mongo-server-selector-test-thread").setDaemon(true).build());
    }

    @After
    public void tearDown() {
        this.threadAscending.shutdown();
        this.threadDescending.shutdown();
        this.threadOther.shutdown();
    }

    @Test
    public void calledFromANonDownloaderThread() throws ExecutionException, InterruptedException {
        PipelinedMongoServerSelector pipelinedMongoServerSelector = new PipelinedMongoServerSelector("mongo-dump-");
        this.threadOther.submit(() -> {
            Assert.assertEquals(3L, pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.allReplicas).size());
            Assert.assertEquals(2L, pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.oneSecondaryUp).size());
            Assert.assertEquals(1L, pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.noSecondaryUp).size());
        }).get();
    }

    @Test
    public void allMongoReplicasUp() throws ExecutionException, InterruptedException {
        PipelinedMongoServerSelector pipelinedMongoServerSelector = new PipelinedMongoServerSelector("mongo-dump-");
        AtomicReference atomicReference = new AtomicReference();
        this.threadAscending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.allReplicas);
            Assert.assertEquals(1L, select.size());
            ServerDescription serverDescription = (ServerDescription) select.get(0);
            Assert.assertEquals(ServerType.REPLICA_SET_SECONDARY, serverDescription.getType());
            Assert.assertTrue(this.allReplicas.contains(serverDescription));
            atomicReference.set(serverDescription);
            List select2 = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.allReplicas);
            Assert.assertEquals(1L, select2.size());
            Assert.assertEquals(serverDescription, select2.get(0));
        }).get();
        this.threadDescending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.allReplicas);
            Assert.assertEquals(1L, select.size());
            ServerDescription serverDescription = (ServerDescription) select.get(0);
            Assert.assertEquals(ServerType.REPLICA_SET_SECONDARY, serverDescription.getType());
            Assert.assertTrue(this.allReplicas.contains(serverDescription));
            Assert.assertNotEquals(atomicReference.get(), serverDescription);
        }).get();
    }

    @Test
    public void primaryAndOneSecondaryUp() throws ExecutionException, InterruptedException {
        PipelinedMongoServerSelector pipelinedMongoServerSelector = new PipelinedMongoServerSelector("mongo-dump-");
        pipelinedMongoServerSelector.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(TEST_CLUSTER_ID, new ClusterDescription(ClusterConnectionMode.SINGLE, ClusterType.REPLICA_SET, this.oneSecondaryUp), new ClusterDescription(ClusterConnectionMode.SINGLE, ClusterType.REPLICA_SET, this.oneSecondaryUp)));
        AtomicReference atomicReference = new AtomicReference();
        this.threadAscending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.oneSecondaryUp);
            Assert.assertEquals(1L, select.size());
            ServerDescription serverDescription = (ServerDescription) select.get(0);
            Assert.assertEquals(ServerType.REPLICA_SET_SECONDARY, serverDescription.getType());
            Assert.assertTrue(this.oneSecondaryUp.contains(serverDescription));
            atomicReference.set(serverDescription);
        }).get();
        this.threadDescending.submit(() -> {
            Assert.assertTrue(pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.oneSecondaryUp).isEmpty());
        }).get();
        ExecutorService executorService = this.threadAscending;
        Objects.requireNonNull(pipelinedMongoServerSelector);
        executorService.submit(pipelinedMongoServerSelector::threadFinished).get();
        this.threadDescending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.oneSecondaryUp);
            Assert.assertEquals(1L, select.size());
            Assert.assertEquals(atomicReference.get(), select.get(0));
        }).get();
    }

    @Test
    public void onlyPrimaryUp() throws ExecutionException, InterruptedException {
        PipelinedMongoServerSelector pipelinedMongoServerSelector = new PipelinedMongoServerSelector("mongo-dump-");
        this.threadAscending.submit(() -> {
            Assert.assertTrue(pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, this.noSecondaryUp).isEmpty());
        }).get();
    }

    @Test
    public void mongoScaling() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createServerDescription(ServerType.REPLICA_SET_PRIMARY, ServerConnectionState.CONNECTED));
        arrayList.add(createServerDescription(ServerType.REPLICA_SET_SECONDARY, ServerConnectionState.CONNECTED));
        arrayList.add(createServerDescription(ServerType.REPLICA_SET_SECONDARY, ServerConnectionState.CONNECTED));
        ClusterDescription clusterDescription = new ClusterDescription(ClusterConnectionMode.SINGLE, ClusterType.REPLICA_SET, List.copyOf(arrayList));
        PipelinedMongoServerSelector pipelinedMongoServerSelector = new PipelinedMongoServerSelector("mongo-dump-");
        pipelinedMongoServerSelector.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(TEST_CLUSTER_ID, clusterDescription, clusterDescription));
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.threadAscending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, arrayList);
            Assert.assertEquals(1L, select.size());
            atomicReference.set((ServerDescription) select.get(0));
        }).get();
        this.threadDescending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, arrayList);
            Assert.assertEquals(1L, select.size());
            atomicReference2.set((ServerDescription) select.get(0));
        }).get();
        ServerDescription serverDescription = (ServerDescription) atomicReference.get();
        ServerDescription serverDescription2 = (ServerDescription) arrayList.stream().filter((v0) -> {
            return v0.isPrimary();
        }).findFirst().get();
        arrayList.remove(serverDescription);
        ClusterDescription clusterDescription2 = new ClusterDescription(ClusterConnectionMode.SINGLE, ClusterType.REPLICA_SET, List.copyOf(arrayList));
        pipelinedMongoServerSelector.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(TEST_CLUSTER_ID, clusterDescription, clusterDescription2));
        this.threadAscending.submit(() -> {
            Assert.assertTrue(pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, arrayList).isEmpty());
            atomicReference.set(null);
        }).get();
        this.threadDescending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, arrayList);
            Assert.assertEquals(1L, select.size());
            Assert.assertEquals(atomicReference2.get(), select.get(0));
        }).get();
        arrayList.add(serverDescription);
        this.threadAscending.submit(() -> {
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, List.copyOf(arrayList));
            Assert.assertEquals(1L, select.size());
            atomicReference.set((ServerDescription) select.get(0));
            Assert.assertNotEquals(atomicReference2.get(), select.get(0));
            Assert.assertEquals(serverDescription, select.get(0));
        }).get();
        ServerDescription updateServerType = updateServerType(serverDescription2, ServerType.REPLICA_SET_SECONDARY);
        ServerDescription updateServerType2 = updateServerType(serverDescription, ServerType.REPLICA_SET_PRIMARY);
        arrayList.set(arrayList.indexOf(serverDescription2), updateServerType);
        arrayList.set(arrayList.indexOf(serverDescription), updateServerType2);
        pipelinedMongoServerSelector.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(TEST_CLUSTER_ID, new ClusterDescription(ClusterConnectionMode.SINGLE, ClusterType.REPLICA_SET, List.copyOf(arrayList)), clusterDescription2));
        this.threadAscending.submit(() -> {
            Assert.assertTrue("Failed to detect that it is connected to primary", pipelinedMongoServerSelector.isConnectedToPrimary());
            pipelinedMongoServerSelector.threadFinished();
            List select = pipelinedMongoServerSelector.select(ClusterType.REPLICA_SET, arrayList);
            Assert.assertEquals(1L, select.size());
            atomicReference.set((ServerDescription) select.get(0));
            Assert.assertEquals(updateServerType.getSetName(), ((ServerDescription) select.get(0)).getSetName());
        }).get();
    }

    public ServerDescription createServerDescription(ServerType serverType, ServerConnectionState serverConnectionState) {
        int incrementAndGet = this.serverId.incrementAndGet();
        return ServerDescription.builder().ok(true).setName("mongo-server-" + incrementAndGet).address(new ServerAddress("localhost", 20000 + incrementAndGet)).state(serverConnectionState).type(serverType).build();
    }

    private ServerDescription updateServerType(ServerDescription serverDescription, ServerType serverType) {
        return ServerDescription.builder().ok(serverDescription.isOk()).setName(serverDescription.getSetName()).address(serverDescription.getAddress()).state(serverDescription.getState()).type(serverType).build();
    }
}
