/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.impl.stream;

import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJobImpl;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamBlockedException;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Executor;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamJobsHandler
extends Actor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamJobsHandler.class);
    private final ClientStreamer<JobActivationProperties> jobStreamer;

    public StreamJobsHandler(ClientStreamer<JobActivationProperties> jobStreamer) {
        this.jobStreamer = jobStreamer;
    }

    public void handle(String jobType, JobActivationProperties jobActivationProperties, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver) {
        if (jobType.isBlank()) {
            this.handleError(responseObserver, "type", "present", "blank");
            return;
        }
        if (jobActivationProperties.timeout() < 1L) {
            this.handleError(responseObserver, "timeout", "greater than zero", Long.toString(jobActivationProperties.timeout()));
            return;
        }
        this.handleInternal(jobType, jobActivationProperties, responseObserver);
    }

    private void handleInternal(String jobType, JobActivationProperties jobActivationProperties, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver) {
        DirectBuffer streamType = BufferUtil.wrapString((String)jobType);
        JobStreamConsumer consumer = new JobStreamConsumer(responseObserver, (ConcurrencyControl)this.actor);
        AsyncJobStreamRemover cleaner = new AsyncJobStreamRemover(this.jobStreamer, (Executor)this.actor);
        responseObserver.setOnCloseHandler((Runnable)cleaner);
        responseObserver.setOnCancelHandler((Runnable)cleaner);
        this.actor.run(() -> this.actor.runOnCompletion(this.jobStreamer.add(streamType, (BufferWriter)jobActivationProperties, (ClientStreamConsumer)consumer), (streamId, error) -> this.onStreamAdded((StreamObserver<GatewayOuterClass.ActivatedJob>)responseObserver, cleaner, (ClientStreamId)streamId, (Throwable)error)));
    }

    private void onStreamAdded(StreamObserver<GatewayOuterClass.ActivatedJob> responseObserver, AsyncJobStreamRemover cleaner, ClientStreamId streamId, Throwable error) {
        if (error != null) {
            LOGGER.warn("Failed to register new job stream", error);
            responseObserver.onError((Throwable)Status.UNAVAILABLE.withDescription("Failed to register new job stream").withCause(error).augmentDescription("Cause: " + error.getMessage()).asRuntimeException());
            return;
        }
        cleaner.streamId(streamId);
    }

    private void handleError(ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver, String field, String expectation, String actual) {
        String format = "Expected to stream activated jobs with %s to be %s, but it was %s";
        String errorMessage = "Expected to stream activated jobs with %s to be %s, but it was %s".formatted(field, expectation, actual);
        responseObserver.onError((Throwable)new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(errorMessage)));
    }

    static final class JobStreamConsumer
    implements ClientStreamConsumer {
        private final ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver;
        private final ConcurrencyControl executor;

        JobStreamConsumer(ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver, ConcurrencyControl executor) {
            this.responseObserver = responseObserver;
            this.executor = executor;
        }

        public ActorFuture<Void> push(DirectBuffer payload) {
            CompletableActorFuture result = new CompletableActorFuture();
            try {
                this.executor.run(() -> this.handlePushedJob(payload, (CompletableActorFuture<Void>)result));
            }
            catch (Exception e) {
                this.responseObserver.onError((Throwable)e);
                result.completeExceptionally((Throwable)e);
            }
            return result;
        }

        private void handlePushedJob(DirectBuffer payload, CompletableActorFuture<Void> result) {
            GatewayOuterClass.ActivatedJob activatedJob;
            ActivatedJobImpl deserializedJob = new ActivatedJobImpl();
            if (!this.responseObserver.isReady()) {
                result.completeExceptionally((Throwable)new ClientStreamBlockedException("Expected to push payload (size = '%d') to stream, but stream is blocked".formatted(payload.capacity())));
                return;
            }
            try {
                deserializedJob.wrap(payload);
                activatedJob = ResponseMapper.toActivatedJob((ActivatedJob)deserializedJob);
            }
            catch (Exception e) {
                result.completeExceptionally((Throwable)e);
                return;
            }
            try {
                this.responseObserver.onNext((Object)activatedJob);
                result.complete(null);
            }
            catch (Exception e) {
                this.responseObserver.onError((Throwable)e);
                result.completeExceptionally((Throwable)e);
            }
        }
    }

    static final class AsyncJobStreamRemover
    implements Runnable {
        private final ClientStreamer<JobActivationProperties> jobStreamer;
        private final Executor executor;
        private boolean isRemoved;
        private ClientStreamId streamId;

        AsyncJobStreamRemover(ClientStreamer<JobActivationProperties> jobStreamer, Executor executor) {
            this.jobStreamer = jobStreamer;
            this.executor = executor;
        }

        @Override
        public void run() {
            this.executor.execute(this::remove);
        }

        void streamId(ClientStreamId streamId) {
            this.executor.execute(() -> this.setStreamId(streamId));
        }

        private void remove() {
            this.isRemoved = true;
            if (this.streamId != null) {
                this.jobStreamer.remove(this.streamId);
            }
        }

        private void setStreamId(ClientStreamId streamId) {
            if (this.isRemoved) {
                this.jobStreamer.remove(streamId);
                return;
            }
            this.streamId = streamId;
        }
    }
}

