/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe;

import io.camunda.client.CamundaClient;
import io.camunda.client.api.CamundaFuture;
import io.camunda.client.api.command.CompleteJobCommandStep1;
import io.camunda.client.api.worker.JobHandler;
import io.camunda.client.api.worker.JobWorker;
import io.camunda.client.api.worker.JobWorkerMetrics;
import io.camunda.zeebe.App;
import io.camunda.zeebe.ResponseChecker;
import io.camunda.zeebe.config.AppCfg;
import io.camunda.zeebe.config.WorkerCfg;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker
extends App {
    public static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
    private static final Logger THROTTLED_LOGGER = new ThrottledLogger(LOGGER, Duration.ofSeconds(5L));
    private final WorkerCfg workerCfg;

    Worker(AppCfg config) {
        super(config);
        this.workerCfg = config.getWorker();
    }

    @Override
    public void run() {
        String jobType = this.workerCfg.getJobType();
        long completionDelay = this.workerCfg.getCompletionDelay().toMillis();
        boolean isStreamEnabled = this.workerCfg.isStreamEnabled();
        String variables = this.readVariables(this.workerCfg.getPayloadPath());
        ArrayBlockingQueue requestFutures = new ArrayBlockingQueue(10000);
        CamundaClient client = this.createCamundaClient();
        JobWorkerMetrics metrics = JobWorkerMetrics.micrometer().withMeterRegistry((MeterRegistry)this.registry).withTags((Iterable)Tags.of((String[])new String[]{"workerName", this.workerCfg.getWorkerName(), "jobType", jobType})).build();
        this.printTopology(client);
        JobWorker worker = client.newWorker().jobType(jobType).handler(this.handleJob(client, variables, completionDelay, requestFutures)).streamEnabled(isStreamEnabled).metrics(metrics).open();
        ResponseChecker responseChecker = new ResponseChecker(requestFutures);
        responseChecker.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            worker.close();
            client.close();
            responseChecker.close();
        }));
    }

    private JobHandler handleJob(CamundaClient client, String variables, long completionDelay, BlockingQueue<Future<?>> requestFutures) {
        return (jobClient, job) -> {
            String correlationKey;
            boolean messagePublishedSuccessfully;
            long startHandlingTime = System.currentTimeMillis();
            if (this.workerCfg.isSendMessage() && !(messagePublishedSuccessfully = this.publishMessage(client, correlationKey = job.getVariable(this.workerCfg.getCorrelationKeyVariableName()).toString()))) {
                return;
            }
            CompleteJobCommandStep1 command = jobClient.newCompleteCommand(job.getKey()).variables(variables);
            Worker.addDelayToCompletion(completionDelay, startHandlingTime);
            requestFutures.add((Future<?>)command.send());
        };
    }

    private boolean publishMessage(CamundaClient client, String correlationKey) {
        String messageName = this.workerCfg.getMessageName();
        LOGGER.debug("Publish message '{}' with correlation key '{}'", (Object)messageName, (Object)correlationKey);
        CamundaFuture messageSendFuture = client.newPublishMessageCommand().messageName(messageName).correlationKey(correlationKey).send();
        try {
            messageSendFuture.get(10L, TimeUnit.SECONDS);
            return true;
        }
        catch (Exception ex) {
            THROTTLED_LOGGER.error("Exception on publishing a message with name {} and correlationKey {}", new Object[]{messageName, correlationKey, ex});
            return false;
        }
    }

    private static void addDelayToCompletion(long completionDelay, long startHandlingTime) {
        try {
            long elapsedTime = System.currentTimeMillis() - startHandlingTime;
            if (elapsedTime < completionDelay) {
                long sleepTime = completionDelay - elapsedTime;
                LOGGER.debug("Sleep for {} ms", (Object)sleepTime);
                Thread.sleep(sleepTime);
            } else {
                LOGGER.debug("Skip sleep. Elapsed time {} is larger then {} completion delay.", (Object)elapsedTime, (Object)completionDelay);
            }
        }
        catch (Exception e) {
            THROTTLED_LOGGER.error("Exception on sleep with completion delay {}", (Object)completionDelay, (Object)e);
        }
    }

    private CamundaClient createCamundaClient() {
        WorkerCfg workerCfg = this.config.getWorker();
        Duration timeout = this.config.getWorker().getTimeout() != Duration.ZERO ? this.config.getWorker().getTimeout() : workerCfg.getCompletionDelay().multipliedBy(6L);
        return this.newClientBuilder().numJobWorkerExecutionThreads(workerCfg.getThreads()).defaultJobWorkerName(workerCfg.getWorkerName()).defaultJobTimeout(timeout).defaultJobWorkerMaxJobsActive(workerCfg.getCapacity()).defaultJobPollInterval(workerCfg.getPollingDelay()).build();
    }

    public static void main(String[] args) {
        Worker.createApp(Worker::new);
    }
}

