package org.apache.james.task;

import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskManager;
import org.apache.james.task.TaskManagerWorker;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/task/MemoryTaskManager.class */
public class MemoryTaskManager implements TaskManager {
    private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(5);
    private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
    public static final Duration NOW = Duration.ZERO;
    private final Hostname hostname;
    private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails = new ConcurrentHashMap<>();
    private final TaskManagerWorker worker = new SerialTaskManagerWorker(updater(), UPDATE_INFORMATION_POLLING_DURATION);
    private final WorkQueue workQueue = new MemoryWorkQueue(this.worker);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/task/MemoryTaskManager$DetailsUpdater.class */
    public static class DetailsUpdater implements TaskManagerWorker.Listener {
        private final TaskExecutionDetailsUpdaterFactory updaterFactory;
        private final Hostname hostname;

        DetailsUpdater(TaskExecutionDetailsUpdaterFactory taskExecutionDetailsUpdaterFactory, Hostname hostname) {
            this.updaterFactory = taskExecutionDetailsUpdaterFactory;
            this.hostname = hostname;
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> started(TaskId taskId) {
            return Mono.fromRunnable(() -> {
                this.updaterFactory.apply(taskId).accept(taskExecutionDetails -> {
                    return taskExecutionDetails.started(this.hostname);
                });
            });
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> optional) {
            return Mono.fromRunnable(() -> {
                this.updaterFactory.apply(taskId).accept(taskExecutionDetails -> {
                    return taskExecutionDetails.completed(optional);
                });
            });
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional, String str, Throwable th) {
            return failed(taskId, optional);
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional, Throwable th) {
            return failed(taskId, optional);
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional) {
            return Mono.fromRunnable(() -> {
                this.updaterFactory.apply(taskId).accept(taskExecutionDetails -> {
                    return taskExecutionDetails.failed(optional);
                });
            });
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional) {
            return Mono.fromRunnable(() -> {
                this.updaterFactory.apply(taskId).accept(taskExecutionDetails -> {
                    return taskExecutionDetails.cancelEffectively(optional);
                });
            });
        }

        @Override // org.apache.james.task.TaskManagerWorker.Listener
        public Publisher<Void> updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) {
            throw new IllegalStateException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/james/task/MemoryTaskManager$TaskExecutionDetailsUpdaterFactory.class */
    public interface TaskExecutionDetailsUpdaterFactory {
        Consumer<TaskExecutionDetailsUpdater> apply(TaskId taskId);
    }

    @Inject
    public MemoryTaskManager(Hostname hostname) {
        this.hostname = hostname;
    }

    public TaskId submit(Task task) {
        TaskId generateTaskId = TaskId.generateTaskId();
        this.idToExecutionDetails.put(generateTaskId, TaskExecutionDetails.from(task, generateTaskId, this.hostname));
        this.workQueue.submit(new TaskWithId(generateTaskId, task));
        return generateTaskId;
    }

    public TaskExecutionDetails getExecutionDetails(TaskId taskId) {
        return (TaskExecutionDetails) Optional.ofNullable(this.idToExecutionDetails.get(taskId)).orElseThrow(TaskNotFoundException::new);
    }

    public List<TaskExecutionDetails> list() {
        return ImmutableList.copyOf(this.idToExecutionDetails.values());
    }

    public List<TaskExecutionDetails> list(TaskManager.Status status) {
        return ImmutableList.copyOf(tasksFiltered(status).values());
    }

    private Map<TaskId, TaskExecutionDetails> tasksFiltered(TaskManager.Status status) {
        return (Map) this.idToExecutionDetails.entrySet().stream().filter(entry -> {
            return ((TaskExecutionDetails) entry.getValue()).getStatus().equals(status);
        }).collect(Guavate.entriesToImmutableMap());
    }

    public void cancel(TaskId taskId) {
        Optional.ofNullable(this.idToExecutionDetails.get(taskId)).ifPresent(taskExecutionDetails -> {
            updateDetails(taskId).accept(taskExecutionDetails -> {
                return taskExecutionDetails.cancelRequested(this.hostname);
            });
            this.workQueue.cancel(taskId);
        });
    }

    public TaskExecutionDetails await(TaskId taskId, Duration duration) throws TaskNotFoundException, TaskManager.ReachedTimeoutException {
        try {
            return (TaskExecutionDetails) Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()).map(l -> {
                return getExecutionDetails(taskId);
            }).filter(taskExecutionDetails -> {
                return taskExecutionDetails.getStatus().isFinished();
            }).blockFirst(duration);
        } catch (IllegalStateException e) {
            throw new TaskManager.ReachedTimeoutException();
        }
    }

    @PreDestroy
    public void stop() {
        try {
            this.workQueue.close();
        } catch (IOException e) {
        }
    }

    private DetailsUpdater updater() {
        return new DetailsUpdater(this::updateDetails, this.hostname);
    }

    private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
        return taskExecutionDetailsUpdater -> {
            this.idToExecutionDetails.replace(taskId, taskExecutionDetailsUpdater.update(this.idToExecutionDetails.get(taskId)));
        };
    }
}
