package io.camunda.zeebe.process.test.engine;

import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/process/test/engine/EngineStateMonitor.class */
final class EngineStateMonitor implements LogStorage.CommitListener {
    private static final int GRACE_PERIOD_MS = 50;
    private static final int NOTIFICATION_THRESHOLD = 2;
    private final List<Runnable> idleCallbacks = new ArrayList();
    private final List<Runnable> processingCallbacks = new ArrayList();
    private final StreamProcessor streamProcessor;
    private volatile TimerTask stateNotifier;
    private static final Logger LOG = LoggerFactory.getLogger(EngineStateMonitor.class);
    private static final Timer TIMER = new Timer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EngineStateMonitor(InMemoryLogStorage inMemoryLogStorage, StreamProcessor streamProcessor) {
        inMemoryLogStorage.addCommitListener(this);
        this.streamProcessor = streamProcessor;
    }

    public void addOnIdleCallback(Runnable runnable) {
        synchronized (this.idleCallbacks) {
            this.idleCallbacks.add(runnable);
        }
        scheduleStateNotification();
    }

    public void addOnProcessingCallback(Runnable runnable) {
        synchronized (this.processingCallbacks) {
            this.processingCallbacks.add(runnable);
        }
        scheduleStateNotification();
    }

    private synchronized void scheduleStateNotification() {
        if (this.stateNotifier != null) {
            this.stateNotifier.cancel();
            TIMER.purge();
        }
        this.stateNotifier = createStateNotifier();
        TIMER.scheduleAtFixedRate(this.stateNotifier, 50L, 50L);
    }

    private boolean isInIdleState() {
        try {
            return ((Boolean) this.streamProcessor.hasProcessingReachedTheEnd().join()).booleanValue();
        } catch (Exception e) {
            LOG.debug("Exception occurred while checking idle state", e);
            return this.streamProcessor.isActorClosed();
        }
    }

    public void onCommit() {
        notifyProcessingCallbacks();
        if (this.idleCallbacks.isEmpty() && this.processingCallbacks.isEmpty()) {
            return;
        }
        scheduleStateNotification();
    }

    private void notifyIdleCallbacks() {
        synchronized (this.idleCallbacks) {
            this.idleCallbacks.forEach((v0) -> {
                v0.run();
            });
            this.idleCallbacks.clear();
        }
    }

    private void notifyProcessingCallbacks() {
        synchronized (this.processingCallbacks) {
            this.processingCallbacks.forEach((v0) -> {
                v0.run();
            });
            this.processingCallbacks.clear();
        }
    }

    private TimerTask createStateNotifier() {
        return new TimerTask() { // from class: io.camunda.zeebe.process.test.engine.EngineStateMonitor.1
            private int idleStateReachedCounter = 0;

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (!EngineStateMonitor.this.idleCallbacks.isEmpty() || !EngineStateMonitor.this.processingCallbacks.isEmpty()) {
                    if (EngineStateMonitor.this.isInIdleState()) {
                        this.idleStateReachedCounter++;
                        if (this.idleStateReachedCounter >= EngineStateMonitor.NOTIFICATION_THRESHOLD) {
                            EngineStateMonitor.this.notifyIdleCallbacks();
                        }
                    } else {
                        this.idleStateReachedCounter = 0;
                        EngineStateMonitor.this.notifyProcessingCallbacks();
                    }
                }
                if (EngineStateMonitor.this.idleCallbacks.isEmpty() && EngineStateMonitor.this.processingCallbacks.isEmpty()) {
                    cancel();
                }
            }
        };
    }
}
