/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.examples.async;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.statefun.examples.async.Constants;
import org.apache.flink.statefun.examples.async.events.TaskCompletionEvent;
import org.apache.flink.statefun.examples.async.events.TaskStartedEvent;
import org.apache.flink.statefun.examples.async.service.TaskQueryService;
import org.apache.flink.statefun.examples.async.service.TaskStatus;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;

final class TaskDurationTrackerFunction
implements StatefulFunction {
    static final FunctionType TYPE = new FunctionType("org.apache.flink.statefun.examples.async", "duration-tracker");
    private final TaskQueryService service;

    TaskDurationTrackerFunction(TaskQueryService service) {
        this.service = Objects.requireNonNull(service);
    }

    public void invoke(Context context, Object message) {
        if (message instanceof TaskStartedEvent) {
            TaskStartedEvent e = (TaskStartedEvent)message;
            CompletableFuture<TaskStatus> result = this.service.getTaskStatusAsync(e.getTaskId());
            context.registerAsyncOperation(message, result);
            return;
        }
        if (message instanceof AsyncOperationResult) {
            AsyncOperationResult asyncOp = (AsyncOperationResult)message;
            TaskDurationTrackerFunction.onAsyncOperationResultEvent(context, (AsyncOperationResult<TaskStartedEvent, TaskStatus>)asyncOp);
            return;
        }
        throw new IllegalArgumentException("Unknown event " + message);
    }

    private static void onAsyncOperationResultEvent(Context context, AsyncOperationResult<TaskStartedEvent, TaskStatus> asyncOp) {
        TaskStartedEvent e = (TaskStartedEvent)asyncOp.metadata();
        if (!asyncOp.successful()) {
            Duration delay = TaskDurationTrackerFunction.oneSecondPlusJitter();
            context.sendAfter(delay, context.self(), (Object)e);
            return;
        }
        TaskStatus status = (TaskStatus)asyncOp.value();
        if (!status.isCompleted()) {
            context.sendAfter(Duration.ofSeconds(10L), context.self(), (Object)e);
            return;
        }
        TaskDurationTrackerFunction.handleCompletedTask(context, e, status);
    }

    private static Duration oneSecondPlusJitter() {
        long randomJitter = ThreadLocalRandom.current().nextLong(1000L, 1250L);
        return Duration.ofMillis(randomJitter);
    }

    private static void handleCompletedTask(Context context, TaskStartedEvent taskStartedEvent, TaskStatus finishedTaskStatus) {
        TaskCompletionEvent taskCompletionEvent = new TaskCompletionEvent(taskStartedEvent.getTaskId(), taskStartedEvent.getStartTime(), finishedTaskStatus.getCompletionTime());
        context.send(Constants.RESULT_EGRESS, (Object)taskCompletionEvent);
    }
}

