package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.AtomixCluster;
import io.camunda.client.CamundaClient;
import io.camunda.client.api.response.Topology;
import io.camunda.security.configuration.SecurityConfigurations;
import io.camunda.service.UserServices;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.broker.test.TestActorSchedulerFactory;
import io.camunda.zeebe.broker.test.TestBrokerClientFactory;
import io.camunda.zeebe.broker.test.TestClusterFactory;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.security.crypto.password.PasswordEncoder;

/* loaded from: input_file:io/camunda/zeebe/broker/partitioning/PartitionLeaveTest.class */
final class PartitionLeaveTest {
    private static final MeterRegistry METER_REGISTRY = new SimpleMeterRegistry();

    PartitionLeaveTest() {
    }

    @Test
    void canStillProcessAfterLeaving(@TempDir Path path) {
        Broker buildBroker = buildBroker(path.resolve("broker-0"), brokerCfg -> {
            ClusterCfg cluster = brokerCfg.getCluster();
            cluster.setClusterSize(2);
            cluster.setNodeId(0);
            cluster.setPartitionsCount(1);
            cluster.setReplicationFactor(2);
        });
        InetSocketAddress advertisedAddress = buildBroker.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker buildBroker2 = buildBroker(path.resolve("broker-1"), brokerCfg2 -> {
            ClusterCfg cluster = brokerCfg2.getCluster();
            cluster.setInitialContactPoints(List.of(advertisedAddress.getHostName() + ":" + advertisedAddress.getPort()));
            cluster.setClusterSize(2);
            cluster.setNodeId(1);
            cluster.setPartitionsCount(1);
            cluster.setReplicationFactor(2);
        });
        Objects.requireNonNull(buildBroker);
        Objects.requireNonNull(buildBroker2);
        CompletableFuture.allOf(CompletableFuture.runAsync(buildBroker::start), CompletableFuture.runAsync(buildBroker2::start)).join();
        try {
            CamundaClient build = CamundaClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + buildBroker.getConfig().getGateway().getNetwork().getPort()).build();
            try {
                Awaitility.await().untilAsserted(() -> {
                    TopologyAssert.assertThat((Topology) build.newTopologyRequest().send().join()).isComplete(2, 1, 2);
                });
                buildBroker2.getBrokerContext().getPartitionManager().leave(1).join();
                build.newPublishMessageCommand().messageName("msg").correlationKey("key").send().join();
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
            buildBroker.close();
            buildBroker2.close();
        }
    }

    @Test
    void shouldRemoveDataAfterLeaving(@TempDir Path path) {
        Broker buildBroker = buildBroker(path.resolve("broker-0"), brokerCfg -> {
            ClusterCfg cluster = brokerCfg.getCluster();
            cluster.setClusterSize(2);
            cluster.setNodeId(0);
            cluster.setPartitionsCount(2);
            cluster.setReplicationFactor(2);
        });
        InetSocketAddress advertisedAddress = buildBroker.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker buildBroker2 = buildBroker(path.resolve("broker-1"), brokerCfg2 -> {
            ClusterCfg cluster = brokerCfg2.getCluster();
            cluster.setInitialContactPoints(List.of(advertisedAddress.getHostName() + ":" + advertisedAddress.getPort()));
            cluster.setClusterSize(2);
            cluster.setNodeId(1);
            cluster.setPartitionsCount(2);
            cluster.setReplicationFactor(2);
        });
        Objects.requireNonNull(buildBroker);
        Objects.requireNonNull(buildBroker2);
        CompletableFuture.allOf(CompletableFuture.runAsync(buildBroker::start), CompletableFuture.runAsync(buildBroker2::start)).join();
        try {
            CamundaClient build = CamundaClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + buildBroker.getConfig().getGateway().getNetwork().getPort()).build();
            try {
                Awaitility.await().untilAsserted(() -> {
                    TopologyAssert.assertThat((Topology) build.newTopologyRequest().send().join()).isComplete(2, 2, 2);
                });
                buildBroker2.getBrokerContext().getPartitionManager().leave(1).join();
                Assertions.assertThat(path.resolve("broker-1/data/raft-partition/partitions/1")).doesNotExist();
                Assertions.assertThat(path.resolve("broker-1/data/raft-partition/partitions/2")).isNotEmptyDirectory();
                Assertions.assertThat(path.resolve("broker-0/data/raft-partition/partitions/1")).isNotEmptyDirectory();
                Assertions.assertThat(path.resolve("broker-0/data/raft-partition/partitions/2")).isNotEmptyDirectory();
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
            buildBroker.close();
            buildBroker2.close();
        }
    }

    @Test
    void shouldNotRemoveDataIfLeavingFails(@TempDir Path path) {
        Broker buildBroker = buildBroker(path.resolve("broker-0"), brokerCfg -> {
            ClusterCfg cluster = brokerCfg.getCluster();
            cluster.setClusterSize(2);
            cluster.setNodeId(0);
            cluster.setPartitionsCount(2);
            cluster.setReplicationFactor(2);
        });
        InetSocketAddress advertisedAddress = buildBroker.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker buildBroker2 = buildBroker(path.resolve("broker-1"), brokerCfg2 -> {
            ClusterCfg cluster = brokerCfg2.getCluster();
            cluster.setInitialContactPoints(List.of(advertisedAddress.getHostName() + ":" + advertisedAddress.getPort()));
            cluster.setClusterSize(2);
            cluster.setNodeId(1);
            cluster.setPartitionsCount(2);
            cluster.setReplicationFactor(2);
        });
        Objects.requireNonNull(buildBroker);
        Objects.requireNonNull(buildBroker2);
        CompletableFuture.allOf(CompletableFuture.runAsync(buildBroker::start), CompletableFuture.runAsync(buildBroker2::start)).join();
        try {
            CamundaClient build = CamundaClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + buildBroker.getConfig().getGateway().getNetwork().getPort()).build();
            try {
                Awaitility.await().untilAsserted(() -> {
                    TopologyAssert.assertThat((Topology) build.newTopologyRequest().send().join()).isComplete(2, 2, 2);
                });
                buildBroker.close();
                Assertions.assertThatThrownBy(() -> {
                    buildBroker2.getBrokerContext().getPartitionManager().leave(1).join();
                });
                Assertions.assertThat(path.resolve("broker-1/data/raft-partition/partitions/1")).isNotEmptyDirectory();
                Assertions.assertThat(path.resolve("broker-1/data/raft-partition/partitions/2")).isNotEmptyDirectory();
                Assertions.assertThat(path.resolve("broker-0/data/raft-partition/partitions/1")).isNotEmptyDirectory();
                Assertions.assertThat(path.resolve("broker-0/data/raft-partition/partitions/2")).isNotEmptyDirectory();
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
            buildBroker.close();
            buildBroker2.close();
        }
    }

    private static Broker buildBroker(Path path, Consumer<BrokerCfg> consumer) {
        BrokerCfg brokerCfg = new BrokerCfg();
        EmbeddedBrokerRule.assignSocketAddresses(brokerCfg);
        brokerCfg.init(path.toAbsolutePath().toString());
        consumer.accept(brokerCfg);
        ActorScheduler ofBrokerConfig = TestActorSchedulerFactory.ofBrokerConfig(brokerCfg);
        AtomixCluster createAtomixCluster = TestClusterFactory.createAtomixCluster(brokerCfg, METER_REGISTRY);
        return new Broker(new SystemContext(brokerCfg, ofBrokerConfig, createAtomixCluster, TestBrokerClientFactory.createBrokerClient(createAtomixCluster, ofBrokerConfig), SecurityConfigurations.unauthenticated(), (UserServices) null, (PasswordEncoder) null), new SpringBrokerBridge(), List.of());
    }
}
