package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import io.axoniq.axonserver.grpc.control.Heartbeat;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/ControlChannelImpl.class */
public class ControlChannelImpl extends AbstractAxonServerChannel implements ControlChannel {
    private static final Logger logger = LoggerFactory.getLogger(ControlChannelImpl.class);
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final long processorUpdateFrequency;
    private final AtomicReference<StreamObserver<PlatformInboundInstruction>> instructionDispatcher;
    private final Map<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>> instructionHandlers;
    private final HeartbeatMonitor heartbeatMonitor;
    private final Map<String, CompletableFuture<InstructionAck>> awaitingAck;
    private final String context;
    private final Map<String, ProcessorInstructionHandler> processorInstructionHandlers;
    private final Map<String, Supplier<EventProcessorInfo>> processorInfoSuppliers;
    private final AtomicBoolean infoSupplierActive;
    private final PlatformServiceGrpc.PlatformServiceStub platformServiceStub;

    /* loaded from: input_file:io/axoniq/axonserver/connector/impl/ControlChannelImpl$AckHandler.class */
    private class AckHandler implements InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> {
        private AckHandler() {
        }

        @Override // io.axoniq.axonserver.connector.InstructionHandler
        public void handle(PlatformOutboundInstruction platformOutboundInstruction, ReplyChannel<PlatformInboundInstruction> replyChannel) {
            String instructionId = platformOutboundInstruction.getAck().getInstructionId();
            ControlChannelImpl.logger.info("Received ACK for {}", instructionId);
            CompletableFuture completableFuture = (CompletableFuture) ControlChannelImpl.this.awaitingAck.remove(instructionId);
            if (completableFuture != null) {
                completableFuture.complete(platformOutboundInstruction.getAck());
            }
        }
    }

    /* loaded from: input_file:io/axoniq/axonserver/connector/impl/ControlChannelImpl$PlatformOutboundInstructionHandler.class */
    private class PlatformOutboundInstructionHandler extends AbstractIncomingInstructionStream<PlatformOutboundInstruction, PlatformInboundInstruction> {
        public PlatformOutboundInstructionHandler(String str, Consumer<Throwable> consumer) {
            super(str, 0, 0, consumer);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public PlatformInboundInstruction buildAckMessage(InstructionAck instructionAck) {
            return PlatformInboundInstruction.newBuilder().setAck(instructionAck).m974build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public String getInstructionId(PlatformOutboundInstruction platformOutboundInstruction) {
            return platformOutboundInstruction.getInstructionId();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> getHandler(PlatformOutboundInstruction platformOutboundInstruction) {
            return (InstructionHandler) ControlChannelImpl.this.instructionHandlers.get(platformOutboundInstruction.getRequestCase());
        }

        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        protected boolean unregisterOutboundStream(StreamObserver<PlatformInboundInstruction> streamObserver) {
            ControlChannelImpl.this.heartbeatMonitor.pause();
            boolean compareAndSet = ControlChannelImpl.this.instructionDispatcher.compareAndSet(streamObserver, null);
            if (compareAndSet) {
                ControlChannelImpl.this.failOpenInstructions(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "Disconnected from AxonServer before receiving instruction ACK", clientId()));
            }
            return compareAndSet;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public PlatformInboundInstruction buildFlowControlMessage(FlowControl flowControl) {
            return null;
        }
    }

    public ControlChannelImpl(ClientIdentification clientIdentification, String str, ScheduledExecutorService scheduledExecutorService, AxonServerManagedChannel axonServerManagedChannel, long j) {
        super(scheduledExecutorService, axonServerManagedChannel);
        this.instructionDispatcher = new AtomicReference<>();
        this.instructionHandlers = new EnumMap(PlatformOutboundInstruction.RequestCase.class);
        this.awaitingAck = new ConcurrentHashMap();
        this.processorInstructionHandlers = new ConcurrentHashMap();
        this.processorInfoSuppliers = new ConcurrentHashMap();
        this.infoSupplierActive = new AtomicBoolean();
        this.clientIdentification = clientIdentification;
        this.context = str;
        this.executor = scheduledExecutorService;
        this.processorUpdateFrequency = j;
        HeartbeatSender heartbeatSender = this::sendHeartBeat;
        axonServerManagedChannel.getClass();
        this.heartbeatMonitor = new HeartbeatMonitor(scheduledExecutorService, heartbeatSender, axonServerManagedChannel::forceReconnect);
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.ACK, requestCase -> {
            return new AckHandler();
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.HEARTBEAT, requestCase2 -> {
            return (platformOutboundInstruction, replyChannel) -> {
                this.heartbeatMonitor.handleIncomingBeat(replyChannel);
            };
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.MERGE_EVENT_PROCESSOR_SEGMENT, requestCase3 -> {
            return ProcessorInstructions.mergeHandler(this.processorInstructionHandlers);
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.SPLIT_EVENT_PROCESSOR_SEGMENT, requestCase4 -> {
            return ProcessorInstructions.splitHandler(this.processorInstructionHandlers);
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.START_EVENT_PROCESSOR, requestCase5 -> {
            return ProcessorInstructions.startHandler(this.processorInstructionHandlers);
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.PAUSE_EVENT_PROCESSOR, requestCase6 -> {
            return ProcessorInstructions.pauseHandler(this.processorInstructionHandlers);
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.RELEASE_SEGMENT, requestCase7 -> {
            return ProcessorInstructions.releaseSegmentHandler(this.processorInstructionHandlers);
        });
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.REQUEST_EVENT_PROCESSOR_INFO, requestCase8 -> {
            return ProcessorInstructions.requestInfoHandler(this.processorInfoSuppliers);
        });
        this.platformServiceStub = PlatformServiceGrpc.newStub(axonServerManagedChannel);
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void connect() {
        if (this.instructionDispatcher.get() != null) {
            logger.info("Not connecting - connection already present");
            return;
        }
        StreamObserver<PlatformOutboundInstruction> platformOutboundInstructionHandler = new PlatformOutboundInstructionHandler(this.clientIdentification.getClientId(), this::handleDisconnect);
        logger.debug("Opening instruction stream");
        this.platformServiceStub.openStream(platformOutboundInstructionHandler);
        StreamObserver<PlatformInboundInstruction> instructionsForPlatform = platformOutboundInstructionHandler.getInstructionsForPlatform();
        ObjectUtils.silently(this.instructionDispatcher.getAndSet(instructionsForPlatform), (v0) -> {
            v0.onCompleted();
        });
        try {
            logger.info("Connected instruction stream for context '{}'. Sending client identification", this.context);
            instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder().setRegister(this.clientIdentification).m974build());
            this.heartbeatMonitor.resume();
        } catch (Exception e) {
            this.instructionDispatcher.set(null);
            instructionsForPlatform.onError(e);
        }
    }

    private void handleDisconnect(Throwable th) {
        failOpenInstructions(th);
        scheduleReconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failOpenInstructions(Throwable th) {
        while (!this.awaitingAck.isEmpty()) {
            this.awaitingAck.keySet().forEach(str -> {
                ObjectUtils.doIfNotNull(this.awaitingAck.remove(str), completableFuture -> {
                    completableFuture.completeExceptionally(th);
                });
            });
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void disconnect() {
        this.heartbeatMonitor.disableHeartbeat();
        StreamObserver<PlatformInboundInstruction> andSet = this.instructionDispatcher.getAndSet(null);
        if (andSet != null) {
            andSet.onCompleted();
        }
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public Registration registerInstructionHandler(PlatformOutboundInstruction.RequestCase requestCase, InstructionHandler instructionHandler) {
        this.instructionHandlers.put(requestCase, instructionHandler);
        return () -> {
            this.instructionHandlers.remove(requestCase, instructionHandler);
        };
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public Registration registerEventProcessor(String str, Supplier<EventProcessorInfo> supplier, ProcessorInstructionHandler processorInstructionHandler) {
        this.processorInstructionHandlers.put(str, processorInstructionHandler);
        this.processorInfoSuppliers.put(str, supplier);
        if (this.infoSupplierActive.compareAndSet(false, true)) {
            sendScheduledProcessorInfo();
        }
        return () -> {
            this.processorInstructionHandlers.remove(str, processorInstructionHandler);
            this.processorInfoSuppliers.remove(str, supplier);
        };
    }

    private void sendScheduledProcessorInfo() {
        Collection<Supplier<EventProcessorInfo>> values = this.processorInfoSuppliers.values();
        if (!values.isEmpty()) {
            values.forEach(supplier -> {
                ObjectUtils.doIfNotNull(supplier.get(), this::sendProcessorInfo);
            });
            this.executor.schedule(this::sendScheduledProcessorInfo, this.processorUpdateFrequency, TimeUnit.MILLISECONDS);
            return;
        }
        this.infoSupplierActive.set(false);
        if (this.processorInfoSuppliers.isEmpty() || !this.infoSupplierActive.compareAndSet(false, true)) {
            return;
        }
        sendScheduledProcessorInfo();
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public void enableHeartbeat(long j, long j2, TimeUnit timeUnit) {
        this.heartbeatMonitor.enableHeartbeat(j, j2, timeUnit);
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public void disableHeartbeat() {
        this.heartbeatMonitor.disableHeartbeat();
    }

    public CompletableFuture<InstructionAck> sendProcessorInfo(EventProcessorInfo eventProcessorInfo) {
        return sendInstruction(PlatformInboundInstruction.newBuilder().setEventProcessorInfo(eventProcessorInfo).m974build());
    }

    public CompletableFuture<InstructionAck> sendHeartBeat() {
        return !isConnected() ? CompletableFuture.completedFuture(null) : sendInstruction(PlatformInboundInstruction.newBuilder().setInstructionId(UUID.randomUUID().toString()).setHeartbeat(Heartbeat.getDefaultInstance()).m974build());
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public CompletableFuture<InstructionAck> sendInstruction(PlatformInboundInstruction platformInboundInstruction) {
        CompletableFuture<InstructionAck> completableFuture = new CompletableFuture<>();
        String instructionId = platformInboundInstruction.getInstructionId();
        if (logger.isDebugEnabled()) {
            logger.debug("Sending instruction: {} {}", platformInboundInstruction.getRequestCase().name(), instructionId);
        }
        StreamObserver<PlatformInboundInstruction> streamObserver = this.instructionDispatcher.get();
        if (streamObserver == null) {
            completableFuture.completeExceptionally(new AxonServerException(ErrorCategory.INSTRUCTION_EXECUTION_ERROR, "Unable to send instruction", this.clientIdentification.getClientId()));
        } else {
            if (ObjectUtils.hasLength(instructionId)) {
                this.awaitingAck.put(instructionId, completableFuture);
            }
            try {
                streamObserver.onNext(platformInboundInstruction);
            } catch (Exception e) {
                this.awaitingAck.remove(instructionId);
                completableFuture.completeExceptionally(e);
                ObjectUtils.silently(streamObserver, streamObserver2 -> {
                    streamObserver2.onError(e);
                });
            }
        }
        return completableFuture;
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public boolean isConnected() {
        return this.instructionDispatcher.get() != null;
    }
}
