/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.admin.exporting;

import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.dto.BrokerRequest;
import io.camunda.zeebe.gateway.admin.BrokerAdminRequest;
import io.camunda.zeebe.gateway.admin.IncompleteTopologyException;
import io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.agrona.collections.IntHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExportingControlService
implements ExportingControlApi {
    private static final Logger LOG = LoggerFactory.getLogger(ExportingControlService.class);
    final BrokerClient brokerClient;

    public ExportingControlService(BrokerClient brokerClient) {
        this.brokerClient = brokerClient;
    }

    @Override
    public CompletableFuture<Void> pauseExporting() {
        LOG.info("Pausing exporting on all partitions.");
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        return this.broadcastOnTopology(topology, BrokerAdminRequest::pauseExporting);
    }

    @Override
    public CompletableFuture<Void> softPauseExporting() {
        LOG.info("Soft Pausing exporting on all partitions.");
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        return this.broadcastOnTopology(topology, BrokerAdminRequest::softPauseExporting);
    }

    @Override
    public CompletableFuture<Void> resumeExporting() {
        LOG.info("Resuming exporting on all partitions.");
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        return this.broadcastOnTopology(topology, BrokerAdminRequest::resumeExporting);
    }

    private CompletableFuture<Void> broadcastOnTopology(BrokerClusterState topology, Consumer<BrokerAdminRequest> configureRequest) {
        this.validateTopology(topology);
        CompletableFuture[] requests = (CompletableFuture[])topology.getPartitions().stream().map(partition -> this.broadcastOnPartition(topology, (Integer)partition, configureRequest)).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(requests);
    }

    private CompletableFuture<Void> broadcastOnPartition(BrokerClusterState topology, Integer partitionId, Consumer<BrokerAdminRequest> configureRequest) {
        int leader = topology.getLeaderForPartition(partitionId.intValue());
        Set followers = Optional.ofNullable(topology.getFollowersForPartition(partitionId.intValue())).orElseGet(Set::of);
        Set inactive = Optional.ofNullable(topology.getInactiveNodesForPartition(partitionId.intValue())).orElseGet(Set::of);
        IntHashSet members = new IntHashSet(topology.getReplicationFactor());
        members.add(leader);
        members.addAll((Collection)followers);
        members.addAll((Collection)inactive);
        CompletableFuture[] requests = (CompletableFuture[])members.stream().map(brokerId -> {
            BrokerAdminRequest request = new BrokerAdminRequest();
            request.setBrokerId((int)brokerId);
            request.setPartitionId(partitionId);
            configureRequest.accept(request);
            return this.brokerClient.sendRequest((BrokerRequest)request);
        }).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(requests);
    }

    private void validateTopology(BrokerClusterState topology) {
        int replicationFactor = topology.getReplicationFactor();
        int expectedPartitions = topology.getPartitionsCount();
        List partitions = topology.getPartitions();
        if (partitions.size() != expectedPartitions) {
            throw new IncompleteTopologyException("Found %s partitions but expected %s, current topology: %s".formatted(partitions.size(), expectedPartitions, topology));
        }
        for (Integer partition : partitions) {
            int leaderId = topology.getLeaderForPartition(partition.intValue());
            if (leaderId == -1 || leaderId == -2) {
                throw new IncompleteTopologyException("Leader %s of partition %s is not known, current topology: %s".formatted(leaderId, partition, topology));
            }
            Set followers = Optional.ofNullable(topology.getFollowersForPartition(partition.intValue())).orElse(Collections.emptySet());
            for (Integer follower : followers) {
                if (follower != -1 && follower != -2) continue;
                throw new IncompleteTopologyException("Follower %s of partition %s is not known, current topology: %s".formatted(follower, partition, topology));
            }
            int memberCount = followers.size() + 1;
            if (memberCount == replicationFactor) continue;
            throw new IncompleteTopologyException("Expected %s members of partition %s but found %s, current topology: %s".formatted(replicationFactor, partition, memberCount, topology));
        }
    }
}

