package org.apache.flink.statefun.examples.async;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.statefun.examples.async.events.TaskCompletionEvent;
import org.apache.flink.statefun.examples.async.events.TaskStartedEvent;
import org.apache.flink.statefun.examples.async.service.TaskQueryService;
import org.apache.flink.statefun.examples.async.service.TaskStatus;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/statefun/examples/async/TaskDurationTrackerFunction.class */
public final class TaskDurationTrackerFunction implements StatefulFunction {
    static final FunctionType TYPE = new FunctionType("org.apache.flink.statefun.examples.async", "duration-tracker");
    private final TaskQueryService service;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskDurationTrackerFunction(TaskQueryService taskQueryService) {
        this.service = (TaskQueryService) Objects.requireNonNull(taskQueryService);
    }

    public void invoke(Context context, Object obj) {
        if (obj instanceof TaskStartedEvent) {
            context.registerAsyncOperation(obj, this.service.getTaskStatusAsync(((TaskStartedEvent) obj).getTaskId()));
        } else {
            if (!(obj instanceof AsyncOperationResult)) {
                throw new IllegalArgumentException("Unknown event " + obj);
            }
            onAsyncOperationResultEvent(context, (AsyncOperationResult) obj);
        }
    }

    private static void onAsyncOperationResultEvent(Context context, AsyncOperationResult<TaskStartedEvent, TaskStatus> asyncOperationResult) {
        TaskStartedEvent taskStartedEvent = (TaskStartedEvent) asyncOperationResult.metadata();
        if (!asyncOperationResult.successful()) {
            context.sendAfter(oneSecondPlusJitter(), context.self(), taskStartedEvent);
            return;
        }
        TaskStatus taskStatus = (TaskStatus) asyncOperationResult.value();
        if (taskStatus.isCompleted()) {
            handleCompletedTask(context, taskStartedEvent, taskStatus);
        } else {
            context.sendAfter(Duration.ofSeconds(10L), context.self(), taskStartedEvent);
        }
    }

    private static Duration oneSecondPlusJitter() {
        return Duration.ofMillis(ThreadLocalRandom.current().nextLong(1000L, 1250L));
    }

    private static void handleCompletedTask(Context context, TaskStartedEvent taskStartedEvent, TaskStatus taskStatus) {
        context.send(Constants.RESULT_EGRESS, new TaskCompletionEvent(taskStartedEvent.getTaskId(), taskStartedEvent.getStartTime(), taskStatus.getCompletionTime()));
    }
}
