/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.transport.stream.impl.AggregatedClientStream;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRegistration;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponseDecoder;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.VisibleForTesting;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import java.lang.runtime.SwitchBootstraps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ClientStreamRequestManager<M extends BufferWriter> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientStreamRequestManager.class);
    private static final byte[] REMOVE_ALL_REQUEST = new byte[0];
    private static final Duration RETRY_DELAY = Duration.ofSeconds(1L);
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5L);
    private final Map<MemberId, Map<UUID, ClientStreamRegistration<M>>> registrations = new HashMap<MemberId, Map<UUID, ClientStreamRegistration<M>>>();
    private final StreamResponseDecoder responseDecoder = new StreamResponseDecoder();
    private final ClusterCommunicationService communicationService;
    private final ConcurrencyControl executor;

    ClientStreamRequestManager(ClusterCommunicationService communicationService, ConcurrencyControl executor) {
        this.communicationService = communicationService;
        this.executor = executor;
    }

    void add(AggregatedClientStream<M> stream, Collection<MemberId> serverIds) {
        for (MemberId serverId : serverIds) {
            this.add(stream, serverId);
        }
    }

    void add(AggregatedClientStream<M> stream, MemberId serverId) {
        ClientStreamRegistration<M> registration = this.registrationFor(stream, serverId);
        this.add(registration);
    }

    void remove(AggregatedClientStream<M> stream, Collection<MemberId> serverIds) {
        for (MemberId serverId : serverIds) {
            this.remove(stream, serverId);
        }
    }

    void remove(AggregatedClientStream<M> stream, MemberId serverId) {
        Map<UUID, ClientStreamRegistration<M>> streamsPerHost = this.registrations.get(serverId);
        if (streamsPerHost == null) {
            return;
        }
        ClientStreamRegistration<M> registration = streamsPerHost.get(stream.streamId());
        if (registration != null) {
            this.remove(registration);
        }
    }

    void removeAll(Collection<MemberId> servers) {
        this.registrations.values().stream().flatMap(m -> m.values().stream()).forEach(ClientStreamRegistration::transitionToClosed);
        this.registrations.clear();
        servers.forEach(this::doRemoveAll);
    }

    void removeUnreliable(UUID streamId, Collection<MemberId> servers) {
        RemoveStreamRequest request = new RemoveStreamRequest().streamId(streamId);
        byte[] payload = BufferUtil.bufferAsArray((BufferWriter)request);
        servers.forEach(serverId -> {
            this.communicationService.unicast(StreamTopics.REMOVE.topic(), (Object)payload, Function.identity(), serverId, true);
            this.purgeRegistration(streamId, (MemberId)serverId);
        });
    }

    private void add(ClientStreamRegistration<M> registration) {
        if (registration.state() == ClientStreamRegistration.State.ADDING || !registration.transitionToAdding()) {
            return;
        }
        AddStreamRequest request = new AddStreamRequest().streamId(registration.streamId()).streamType((DirectBuffer)registration.logicalId().streamType()).metadata(registration.logicalId().metadata());
        CompletionStage<byte[]> pendingRequest = registration.pendingRequest();
        if (pendingRequest != null) {
            throw new IllegalStateException("Failed to add remote client stream %s to %s; there is an incomplete pending request".formatted(registration.streamId(), registration.serverId()));
        }
        byte[] payload = BufferUtil.bufferAsArray((BufferWriter)request);
        this.sendAddRequest(registration, payload);
    }

    private void remove(ClientStreamRegistration<M> registration) {
        if (registration.state() == ClientStreamRegistration.State.INITIAL) {
            registration.transitionToRemoved();
            return;
        }
        if (registration.state() == ClientStreamRegistration.State.REMOVING || !registration.transitionToRemoving()) {
            return;
        }
        RemoveStreamRequest request = new RemoveStreamRequest().streamId(registration.streamId());
        byte[] payload = BufferUtil.bufferAsArray((BufferWriter)request);
        CompletionStage<byte[]> pendingRequest = registration.pendingRequest();
        if (pendingRequest == null) {
            this.sendRemoveRequest(registration, payload);
            return;
        }
        pendingRequest.whenCompleteAsync((ok, error) -> this.sendRemoveRequest(registration, payload), arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    void onServerRemoved(MemberId serverId) {
        Map<UUID, ClientStreamRegistration<M>> perHost = this.registrations.remove(serverId);
        if (perHost == null) {
            return;
        }
        LOGGER.trace("Closing all registrations for server {}", (Object)serverId);
        perHost.values().forEach(ClientStreamRegistration::transitionToClosed);
    }

    @VisibleForTesting(value="Allows easier test set up and validation")
    ClientStreamRegistration<M> registrationFor(AggregatedClientStream<M> stream, MemberId serverId) {
        Map streamsPerHost = this.registrations.computeIfAbsent(serverId, ignored -> new HashMap());
        return streamsPerHost.computeIfAbsent(stream.streamId(), streamId -> new ClientStreamRegistration(stream, serverId));
    }

    private void sendAddRequest(ClientStreamRegistration<M> registration, byte[] request) {
        if (registration.state() != ClientStreamRegistration.State.ADDING) {
            return;
        }
        CompletableFuture pendingRequest = this.communicationService.send(StreamTopics.ADD.topic(), (Object)request, Function.identity(), Function.identity(), registration.serverId(), REQUEST_TIMEOUT);
        registration.setPendingRequest(pendingRequest);
        pendingRequest.whenCompleteAsync((response, error) -> this.handleAddResponse(registration, request, (byte[])response, (Throwable)error), arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    private void handleAddResponse(ClientStreamRegistration<M> registration, byte[] request, byte[] responseBuffer, Throwable error) {
        Object failure;
        ClientStreamRegistration.State state = registration.state();
        if (state != ClientStreamRegistration.State.ADDING) {
            LOGGER.trace("Skip handling ADD response since the state is {}", (Object)state, (Object)error);
            return;
        }
        if (error == null) {
            Either<ErrorResponse, AddStreamResponse> response = this.responseDecoder.decode(responseBuffer, new AddStreamResponse());
            if (response.isRight()) {
                registration.transitionToAdded();
                return;
            }
            failure = ((ErrorResponse)response.getLeft()).asException();
        } else {
            failure = error;
        }
        LOGGER.warn("Failed to add stream {} on {}; will retry in {}", new Object[]{registration.streamId(), registration.serverId(), RETRY_DELAY, failure});
        this.executor.schedule(RETRY_DELAY, () -> this.sendAddRequest(registration, request));
    }

    private void sendRemoveRequest(ClientStreamRegistration<M> registration, byte[] request) {
        if (registration.state() != ClientStreamRegistration.State.REMOVING) {
            return;
        }
        CompletableFuture pendingRequest = this.communicationService.send(StreamTopics.REMOVE.topic(), (Object)request, Function.identity(), Function.identity(), registration.serverId(), REQUEST_TIMEOUT);
        registration.setPendingRequest(pendingRequest);
        pendingRequest.whenCompleteAsync((response, error) -> this.handleRemoveResponse(registration, request, (byte[])response, (Throwable)error), arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    private void handleRemoveResponse(ClientStreamRegistration<M> registration, byte[] request, byte[] responseBuffer, Throwable error) {
        Object failure;
        ClientStreamRegistration.State state = registration.state();
        if (state != ClientStreamRegistration.State.REMOVING) {
            LOGGER.trace("Skip handling REMOVE response since the state is {}", (Object)state, (Object)error);
            return;
        }
        if (error == null) {
            Either<ErrorResponse, RemoveStreamResponse> response = this.responseDecoder.decode(responseBuffer, new RemoveStreamResponse());
            if (response.isRight()) {
                registration.transitionToRemoved();
                this.purgeRegistration(registration.streamId(), registration.serverId());
                return;
            }
            failure = ((ErrorResponse)response.getLeft()).asException();
        } else {
            failure = error;
        }
        Throwable throwable = failure;
        Objects.requireNonNull(throwable);
        Throwable throwable2 = throwable;
        int n = 0;
        switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{UnrecoverableException.class, MessagingException.RemoteHandlerFailure.class, MessagingException.NoSuchMemberException.class, MessagingException.ProtocolException.class}, (Object)throwable2, n)) {
            case 0: {
                UnrecoverableException e = (UnrecoverableException)throwable2;
                this.handleUnrecoverableExceptionOnRemove(registration, (Throwable)e);
                break;
            }
            case 1: {
                MessagingException.RemoteHandlerFailure e = (MessagingException.RemoteHandlerFailure)throwable2;
                this.handleUnrecoverableExceptionOnRemove(registration, (Throwable)e);
                break;
            }
            case 2: {
                MessagingException.NoSuchMemberException e = (MessagingException.NoSuchMemberException)throwable2;
                this.handleUnrecoverableExceptionOnRemove(registration, (Throwable)e);
                break;
            }
            case 3: {
                MessagingException.ProtocolException e = (MessagingException.ProtocolException)throwable2;
                this.handleUnrecoverableExceptionOnRemove(registration, (Throwable)e);
                break;
            }
            default: {
                LOGGER.debug("Failed to remove remote stream {} on {}, will retry in {}", new Object[]{registration.streamId(), registration.serverId(), RETRY_DELAY, failure});
                this.executor.schedule(RETRY_DELAY, () -> this.sendRemoveRequest(registration, request));
            }
        }
    }

    private void handleUnrecoverableExceptionOnRemove(ClientStreamRegistration<M> registration, Throwable e) {
        LOGGER.debug("Failed to remove stream '{}' for member '{}'; unrecoverable error occurred on recipient\nside, will not retry.", new Object[]{registration.streamId(), registration.serverId(), e});
        registration.transitionToRemoved();
        this.purgeRegistration(registration.streamId(), registration.serverId());
    }

    private void purgeRegistration(UUID streamId, MemberId serverId) {
        Map<UUID, ClientStreamRegistration<M>> perHost = this.registrations.get(serverId);
        if (perHost != null) {
            ClientStreamRegistration<M> registration = perHost.remove(streamId);
            if (registration != null) {
                registration.transitionToClosed();
            }
            if (perHost.isEmpty()) {
                this.registrations.remove(serverId);
            }
        }
    }

    private void doRemoveAll(MemberId brokerId) {
        this.communicationService.unicast(StreamTopics.REMOVE_ALL.topic(), (Object)REMOVE_ALL_REQUEST, Function.identity(), brokerId, true);
    }
}

