package io.camunda.zeebe.gateway.admin.exporting;

import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.gateway.admin.BrokerAdminRequest;
import io.camunda.zeebe.gateway.admin.IncompleteTopologyException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.agrona.collections.IntHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/gateway/admin/exporting/ExportingControlService.class */
public class ExportingControlService implements ExportingControlApi {
    private static final Logger LOG = LoggerFactory.getLogger(ExportingControlService.class);
    final BrokerClient brokerClient;

    public ExportingControlService(BrokerClient brokerClient) {
        this.brokerClient = brokerClient;
    }

    @Override // io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi
    public CompletableFuture<Void> pauseExporting() {
        LOG.info("Pausing exporting on all partitions.");
        return broadcastOnTopology(this.brokerClient.getTopologyManager().getTopology(), (v0) -> {
            v0.pauseExporting();
        });
    }

    @Override // io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi
    public CompletableFuture<Void> softPauseExporting() {
        LOG.info("Soft Pausing exporting on all partitions.");
        return broadcastOnTopology(this.brokerClient.getTopologyManager().getTopology(), (v0) -> {
            v0.softPauseExporting();
        });
    }

    @Override // io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi
    public CompletableFuture<Void> resumeExporting() {
        LOG.info("Resuming exporting on all partitions.");
        return broadcastOnTopology(this.brokerClient.getTopologyManager().getTopology(), (v0) -> {
            v0.resumeExporting();
        });
    }

    private CompletableFuture<Void> broadcastOnTopology(BrokerClusterState brokerClusterState, Consumer<BrokerAdminRequest> consumer) {
        validateTopology(brokerClusterState);
        return CompletableFuture.allOf((CompletableFuture[]) brokerClusterState.getPartitions().stream().map(num -> {
            return broadcastOnPartition(brokerClusterState, num, consumer);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private CompletableFuture<Void> broadcastOnPartition(BrokerClusterState brokerClusterState, Integer num, Consumer<BrokerAdminRequest> consumer) {
        int leaderForPartition = brokerClusterState.getLeaderForPartition(num.intValue());
        Set set = (Set) Optional.ofNullable(brokerClusterState.getFollowersForPartition(num.intValue())).orElseGet(Set::of);
        Set set2 = (Set) Optional.ofNullable(brokerClusterState.getInactiveNodesForPartition(num.intValue())).orElseGet(Set::of);
        IntHashSet intHashSet = new IntHashSet(brokerClusterState.getReplicationFactor());
        intHashSet.add(leaderForPartition);
        intHashSet.addAll(set);
        intHashSet.addAll(set2);
        return CompletableFuture.allOf((CompletableFuture[]) intHashSet.stream().map(num2 -> {
            BrokerAdminRequest brokerAdminRequest = new BrokerAdminRequest();
            brokerAdminRequest.setBrokerId(num2.intValue());
            brokerAdminRequest.setPartitionId(num.intValue());
            consumer.accept(brokerAdminRequest);
            return this.brokerClient.sendRequest(brokerAdminRequest);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private void validateTopology(BrokerClusterState brokerClusterState) {
        int replicationFactor = brokerClusterState.getReplicationFactor();
        int partitionsCount = brokerClusterState.getPartitionsCount();
        List<Integer> partitions = brokerClusterState.getPartitions();
        if (partitions.size() != partitionsCount) {
            throw new IncompleteTopologyException("Found %s partitions but expected %s, current topology: %s".formatted(Integer.valueOf(partitions.size()), Integer.valueOf(partitionsCount), brokerClusterState));
        }
        for (Integer num : partitions) {
            int leaderForPartition = brokerClusterState.getLeaderForPartition(num.intValue());
            if (leaderForPartition == -1 || leaderForPartition == -2) {
                throw new IncompleteTopologyException("Leader %s of partition %s is not known, current topology: %s".formatted(Integer.valueOf(leaderForPartition), num, brokerClusterState));
            }
            Set<Integer> set = (Set) Optional.ofNullable(brokerClusterState.getFollowersForPartition(num.intValue())).orElse(Collections.emptySet());
            for (Integer num2 : set) {
                if (num2.intValue() == -1 || num2.intValue() == -2) {
                    throw new IncompleteTopologyException("Follower %s of partition %s is not known, current topology: %s".formatted(num2, num, brokerClusterState));
                }
            }
            int size = set.size() + 1;
            if (size != replicationFactor) {
                throw new IncompleteTopologyException("Expected %s members of partition %s but found %s, current topology: %s".formatted(Integer.valueOf(replicationFactor), num, Integer.valueOf(size), brokerClusterState));
            }
        }
    }
}
