package io.camunda.zeebe.topology.api;

import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.api.ErrorResponse;
import io.camunda.zeebe.topology.api.TopologyRequestFailedException;
import io.camunda.zeebe.topology.serializer.TopologyRequestsSerializer;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.util.Either;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;

/* loaded from: input_file:io/camunda/zeebe/topology/api/TopologyRequestServer.class */
public final class TopologyRequestServer implements AutoCloseable {
    private final TopologyManagementApi topologyManagementApi;
    private final ClusterCommunicationService communicationService;
    private final ConcurrencyControl executor;
    private final TopologyRequestsSerializer serializer;

    public TopologyRequestServer(ClusterCommunicationService clusterCommunicationService, TopologyRequestsSerializer topologyRequestsSerializer, TopologyManagementApi topologyManagementApi, ConcurrencyControl concurrencyControl) {
        this.topologyManagementApi = topologyManagementApi;
        this.communicationService = clusterCommunicationService;
        this.serializer = topologyRequestsSerializer;
        this.executor = concurrencyControl;
    }

    public void start() {
        registerAddMemberRequestsHandler();
        registerRemoveMemberRequestsHandler();
        registerJoinPartitionRequestsHandler();
        registerLeavePartitionRequestsHandler();
        registerReassignPartitionRequestHandler();
        registerScaleRequestHandler();
        registerGetTopologyQueryHandler();
        registerTopologyCancelHandler();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Stream.of((Object[]) TopologyRequestTopics.values()).toList().forEach(topologyRequestTopics -> {
            this.communicationService.unsubscribe(topologyRequestTopics.topic());
        });
    }

    private void registerAddMemberRequestsHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.ADD_MEMBER.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeAddMembersRequest, addMembersRequest -> {
            return mapResponse(this.topologyManagementApi.addMembers(addMembersRequest));
        }, this::encodeResponse);
    }

    byte[] encodeResponse(Either<ErrorResponse, TopologyChangeResponse> either) {
        return either.isLeft() ? this.serializer.encodeResponse((ErrorResponse) either.getLeft()) : this.serializer.encodeResponse((TopologyChangeResponse) either.get());
    }

    byte[] encodeClusterTopologyResponse(Either<ErrorResponse, ClusterTopology> either) {
        return either.isLeft() ? this.serializer.encodeResponse((ErrorResponse) either.getLeft()) : this.serializer.encodeResponse((ClusterTopology) either.get());
    }

    private void registerRemoveMemberRequestsHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.REMOVE_MEMBER.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeRemoveMembersRequest, removeMembersRequest -> {
            return mapResponse(this.topologyManagementApi.removeMembers(removeMembersRequest));
        }, this::encodeResponse);
    }

    private void registerJoinPartitionRequestsHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.JOIN_PARTITION.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeJoinPartitionRequest, joinPartitionRequest -> {
            return mapResponse(this.topologyManagementApi.joinPartition(joinPartitionRequest));
        }, this::encodeResponse);
    }

    private void registerLeavePartitionRequestsHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.LEAVE_PARTITION.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeLeavePartitionRequest, leavePartitionRequest -> {
            return mapResponse(this.topologyManagementApi.leavePartition(leavePartitionRequest));
        }, this::encodeResponse);
    }

    private void registerReassignPartitionRequestHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.REASSIGN_PARTITIONS.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeReassignPartitionsRequest, reassignPartitionsRequest -> {
            return mapResponse(this.topologyManagementApi.reassignPartitions(reassignPartitionsRequest));
        }, this::encodeResponse);
    }

    private void registerScaleRequestHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.SCALE_MEMBERS.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeScaleRequest, scaleRequest -> {
            return mapResponse(this.topologyManagementApi.scaleMembers(scaleRequest));
        }, this::encodeResponse);
    }

    private void registerGetTopologyQueryHandler() {
        this.communicationService.replyTo(TopologyRequestTopics.QUERY_TOPOLOGY.topic(), Function.identity(), bArr -> {
            return mapClusterTopologyResponse(this.topologyManagementApi.getTopology());
        }, this::encodeClusterTopologyResponse);
    }

    private void registerTopologyCancelHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = TopologyRequestTopics.CANCEL_CHANGE.topic();
        TopologyRequestsSerializer topologyRequestsSerializer = this.serializer;
        Objects.requireNonNull(topologyRequestsSerializer);
        clusterCommunicationService.replyTo(str, topologyRequestsSerializer::decodeCancelChangeRequest, cancelChangeRequest -> {
            return mapClusterTopologyResponse(this.topologyManagementApi.cancelTopologyChange(cancelChangeRequest));
        }, this::encodeClusterTopologyResponse);
    }

    private CompletableFuture<Either<ErrorResponse, TopologyChangeResponse>> mapResponse(ActorFuture<TopologyChangeResponse> actorFuture) {
        return actorFuture.toCompletableFuture().thenApply((v0) -> {
            return Either.right(v0);
        }).exceptionally(TopologyRequestServer::mapError);
    }

    private CompletableFuture<Either<ErrorResponse, ClusterTopology>> mapClusterTopologyResponse(ActorFuture<ClusterTopology> actorFuture) {
        return actorFuture.toCompletableFuture().thenApply((v0) -> {
            return Either.right(v0);
        }).exceptionally(TopologyRequestServer::mapError);
    }

    private static <T> Either<ErrorResponse, T> mapError(Throwable th) {
        Throwable cause = th.getCause();
        Objects.requireNonNull(cause);
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), TopologyRequestFailedException.OperationNotAllowed.class, TopologyRequestFailedException.InvalidRequest.class, TopologyRequestFailedException.ConcurrentModificationException.class).dynamicInvoker().invoke(cause, 0) /* invoke-custom */) {
            case 0:
                return Either.left(new ErrorResponse(ErrorResponse.ErrorCode.OPERATION_NOT_ALLOWED, ((TopologyRequestFailedException.OperationNotAllowed) cause).getMessage()));
            case 1:
                return Either.left(new ErrorResponse(ErrorResponse.ErrorCode.INVALID_REQUEST, ((TopologyRequestFailedException.InvalidRequest) cause).getMessage()));
            case 2:
                return Either.left(new ErrorResponse(ErrorResponse.ErrorCode.CONCURRENT_MODIFICATION, ((TopologyRequestFailedException.ConcurrentModificationException) cause).getMessage()));
            default:
                return Either.left(new ErrorResponse(ErrorResponse.ErrorCode.INTERNAL_ERROR, th.getMessage()));
        }
    }
}
