package io.camunda.zeebe.process.test.engine;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.grpc.Server;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/process/test/engine/InMemoryEngine.class */
public class InMemoryEngine implements ZeebeTestEngine {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryEngine.class);
    private final Server grpcServer;
    private final StreamProcessor streamProcessor;
    private final GrpcToLogStreamGateway gateway;
    private final ZeebeDb<ZbColumnFamilies> database;
    private final LogStream logStream;
    private final ActorScheduler scheduler;
    private final RecordStreamSource recordStream;
    private final ControlledActorClock clock;
    private final EngineStateMonitor engineStateMonitor;

    public InMemoryEngine(Server server, StreamProcessor streamProcessor, GrpcToLogStreamGateway grpcToLogStreamGateway, ZeebeDb<ZbColumnFamilies> zeebeDb, LogStream logStream, ActorScheduler actorScheduler, RecordStreamSource recordStreamSource, ControlledActorClock controlledActorClock, EngineStateMonitor engineStateMonitor) {
        this.grpcServer = server;
        this.streamProcessor = streamProcessor;
        this.gateway = grpcToLogStreamGateway;
        this.database = zeebeDb;
        this.logStream = logStream;
        this.scheduler = actorScheduler;
        this.recordStream = recordStreamSource;
        this.clock = controlledActorClock;
        this.engineStateMonitor = engineStateMonitor;
    }

    public void start() {
        try {
            this.grpcServer.start();
            this.streamProcessor.openAsync(false).join();
        } catch (IOException e) {
            LOG.error("Failed starting in memory engine", e);
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        try {
            this.grpcServer.shutdownNow();
            this.grpcServer.awaitTermination();
            this.streamProcessor.close();
            this.database.close();
            this.logStream.close();
            this.scheduler.stop();
        } catch (Exception e) {
            LOG.error("Failed stopping in memory engine", e);
            throw new RuntimeException(e);
        }
    }

    public RecordStreamSource getRecordStreamSource() {
        return this.recordStream;
    }

    public ZeebeClient createClient() {
        return ZeebeClient.newClientBuilder().applyEnvironmentVariableOverrides(false).gatewayAddress(getGatewayAddress()).usePlaintext().build();
    }

    public ZeebeClient createClient(ObjectMapper objectMapper) {
        return ZeebeClient.newClientBuilder().withJsonMapper(new ZeebeObjectMapper(objectMapper)).applyEnvironmentVariableOverrides(false).gatewayAddress(getGatewayAddress()).usePlaintext().build();
    }

    public String getGatewayAddress() {
        return this.gateway.getAddress();
    }

    public void increaseTime(Duration duration) {
        this.clock.addTime(duration);
    }

    public void waitForIdleState(Duration duration) throws InterruptedException, TimeoutException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.engineStateMonitor.addOnIdleCallback(() -> {
            completableFuture.complete(null);
        });
        try {
            completableFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
        }
    }

    public void waitForBusyState(Duration duration) throws InterruptedException, TimeoutException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.engineStateMonitor.addOnProcessingCallback(() -> {
            completableFuture.complete(null);
        });
        try {
            completableFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
        }
    }
}
