package io.camunda.zeebe.process.test.engine;

import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.class */
public final class InMemoryJobStreamer implements JobStreamer {
    private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryJobStreamer.class);
    private final ConcurrentMap<DirectBuffer, InMemoryJobStream> streams = new ConcurrentHashMap();
    private final CommandWriter yieldWriter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/process/test/engine/InMemoryJobStreamer$InMemoryJobStream.class */
    public final class InMemoryJobStream implements JobStreamer.JobStream {
        private final JobActivationProperties properties;
        private final Set<JobConsumer> consumers;

        InMemoryJobStream(JobActivationProperties jobActivationProperties, Set<JobConsumer> set) {
            this.properties = jobActivationProperties;
            this.consumers = set;
        }

        public JobActivationProperties properties() {
            return this.properties;
        }

        public void push(ActivatedJob activatedJob) {
            LinkedList linkedList = new LinkedList(this.consumers);
            Collections.shuffle(linkedList);
            push(linkedList, activatedJob);
        }

        private void push(Queue<JobConsumer> queue, ActivatedJob activatedJob) {
            JobConsumer poll = queue.poll();
            if (poll == null) {
                InMemoryJobStreamer.LOGGER.debug("Failed to push job to clients, exhausted all known clients");
                InMemoryJobStreamer.this.yieldJob(activatedJob);
            } else {
                try {
                    poll.consumeJob(activatedJob).whenCompleteAsync((pushStatus, th) -> {
                        if (th != null) {
                            onPushError(queue, activatedJob, th);
                        } else if (pushStatus == PushStatus.BLOCKED) {
                            InMemoryJobStreamer.LOGGER.trace("Underlying stream or client is blocked, retrying with next consumer");
                            CompletableFuture.runAsync(() -> {
                                push(queue, activatedJob);
                            });
                        }
                    });
                } catch (Exception e) {
                    onPushError(queue, activatedJob, e);
                }
            }
        }

        private void onPushError(Queue<JobConsumer> queue, ActivatedJob activatedJob, Throwable th) {
            InMemoryJobStreamer.LOGGER.debug("Failed to push job to client, retrying with next consumer", th);
            CompletableFuture.runAsync(() -> {
                push(queue, activatedJob);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/process/test/engine/InMemoryJobStreamer$JobConsumer.class */
    public interface JobConsumer {
        CompletionStage<PushStatus> consumeJob(ActivatedJob activatedJob);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/process/test/engine/InMemoryJobStreamer$PushStatus.class */
    public enum PushStatus {
        PUSHED,
        BLOCKED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InMemoryJobStreamer(CommandWriter commandWriter) {
        this.yieldWriter = commandWriter;
    }

    public Optional<JobStreamer.JobStream> streamFor(DirectBuffer directBuffer, Predicate<JobActivationProperties> predicate) {
        return Optional.ofNullable(this.streams.get(directBuffer)).flatMap(inMemoryJobStream -> {
            return predicate.test(inMemoryJobStream.properties()) ? Optional.of(inMemoryJobStream) : Optional.empty();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addStream(DirectBuffer directBuffer, JobActivationProperties jobActivationProperties, JobConsumer jobConsumer) {
        this.streams.compute(directBuffer, (directBuffer2, inMemoryJobStream) -> {
            InMemoryJobStream inMemoryJobStream = inMemoryJobStream == null ? new InMemoryJobStream(jobActivationProperties, new CopyOnWriteArraySet()) : inMemoryJobStream;
            inMemoryJobStream.consumers.add(jobConsumer);
            return inMemoryJobStream;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeStream(DirectBuffer directBuffer, JobConsumer jobConsumer) {
        this.streams.compute(directBuffer, (directBuffer2, inMemoryJobStream) -> {
            if (inMemoryJobStream == null) {
                return null;
            }
            inMemoryJobStream.consumers.remove(jobConsumer);
            if (inMemoryJobStream.consumers.isEmpty()) {
                return null;
            }
            return inMemoryJobStream;
        });
    }

    private void yieldJob(ActivatedJob activatedJob) {
        this.yieldWriter.writeCommandWithKey(Long.valueOf(activatedJob.jobKey()), activatedJob.jobRecord(), new RecordMetadata().intent(JobIntent.YIELD).recordType(RecordType.COMMAND).valueType(ValueType.JOB));
    }
}
