package gobblin.runtime;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import gobblin.annotation.Alpha;
import gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import gobblin.broker.gobblin_scopes.TaskScopeInstance;
import gobblin.broker.iface.SharedResourcesBroker;
import gobblin.commit.CommitStep;
import gobblin.configuration.WorkUnitState;
import gobblin.metastore.StateStore;
import gobblin.metrics.event.EventSubmitter;
import gobblin.runtime.task.TaskFactory;
import gobblin.runtime.task.TaskIFaceWrapper;
import gobblin.runtime.task.TaskUtils;
import gobblin.runtime.util.JobMetrics;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.ExecutorsUtils;
import gobblin.util.executors.IteratorExecutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:gobblin/runtime/GobblinMultiTaskAttempt.class */
public class GobblinMultiTaskAttempt {
    private final Logger log;
    private final Iterator<WorkUnit> workUnits;
    private final String jobId;
    private final JobState jobState;
    private final TaskStateTracker taskStateTracker;
    private final TaskExecutor taskExecutor;
    private final Optional<String> containerIdOptional;
    private final Optional<StateStore<TaskState>> taskStateStoreOptional;
    private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
    private List<Task> tasks;
    private List<CommitStep> cleanupCommitSteps;

    /* loaded from: input_file:gobblin/runtime/GobblinMultiTaskAttempt$CommitPolicy.class */
    public enum CommitPolicy {
        IMMEDIATE,
        CUSTOMIZED
    }

    public GobblinMultiTaskAttempt(Iterator<WorkUnit> it, String str, JobState jobState, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<String> optional, Optional<StateStore<TaskState>> optional2, SharedResourcesBroker<GobblinScopeTypes> sharedResourcesBroker) {
        this.workUnits = it;
        this.jobId = str;
        this.jobState = jobState;
        this.taskStateTracker = taskStateTracker;
        this.taskExecutor = taskExecutor;
        this.containerIdOptional = optional;
        this.taskStateStoreOptional = optional2;
        this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" + ((String) optional.or("noattempt")));
        this.jobBroker = sharedResourcesBroker;
    }

    public void run() throws IOException, InterruptedException {
        if (!this.workUnits.hasNext()) {
            this.log.warn("No work units to run in container " + ((String) this.containerIdOptional.or("")));
            this.tasks = new ArrayList();
            return;
        }
        CountUpAndDownLatch countUpAndDownLatch = new CountUpAndDownLatch(0);
        this.tasks = runWorkUnits(countUpAndDownLatch);
        this.log.info("Waiting for submitted tasks of job {} to complete in container {}...", this.jobId, this.containerIdOptional.or(""));
        while (countUpAndDownLatch.getCount() > 0) {
            this.log.info(String.format("%d out of %d tasks of job %s are running in container %s", Long.valueOf(countUpAndDownLatch.getCount()), Long.valueOf(countUpAndDownLatch.getRegisteredParties()), this.jobId, this.containerIdOptional.or("")));
            if (countUpAndDownLatch.await(10L, TimeUnit.SECONDS)) {
                break;
            }
        }
        this.log.info("All assigned tasks of job {} have completed in container {}", this.jobId, this.containerIdOptional.or(""));
    }

    public void commit() throws IOException {
        if (this.tasks == null || this.tasks.isEmpty()) {
            this.log.warn("No tasks to be committed in container " + ((String) this.containerIdOptional.or("")));
            return;
        }
        try {
            try {
                IteratorExecutor.logFailures(new IteratorExecutor(Iterators.transform(this.tasks.iterator(), new Function<Task, Callable<Void>>() { // from class: gobblin.runtime.GobblinMultiTaskAttempt.1
                    public Callable<Void> apply(final Task task) {
                        return new Callable<Void>() { // from class: gobblin.runtime.GobblinMultiTaskAttempt.1.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            @Nullable
                            public Void call() throws Exception {
                                task.commit();
                                return null;
                            }
                        };
                    }
                }), getTaskCommitThreadPoolSize(), ExecutorsUtils.newDaemonThreadFactory(Optional.of(this.log), Optional.of("Task-committing-pool-%d"))).executeAndGetResults(), this.log, 10);
                persistTaskStateStore();
                if (this.cleanupCommitSteps != null) {
                    for (CommitStep commitStep : this.cleanupCommitSteps) {
                        this.log.info("Executing additional commit step.");
                        commitStep.execute();
                    }
                }
            } catch (InterruptedException e) {
                this.log.error("Committing of tasks interrupted. Aborting.");
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            persistTaskStateStore();
            if (this.cleanupCommitSteps != null) {
                for (CommitStep commitStep2 : this.cleanupCommitSteps) {
                    this.log.info("Executing additional commit step.");
                    commitStep2.execute();
                }
            }
            throw th;
        }
    }

    public void shutdownTasks() throws InterruptedException {
        this.log.info("Shutting down tasks");
        Iterator<Task> it = this.tasks.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        Iterator<Task> it2 = this.tasks.iterator();
        while (it2.hasNext()) {
            it2.next().awaitShutdown(1000L);
        }
    }

    private void persistTaskStateStore() throws IOException {
        if (!this.taskStateStoreOptional.isPresent()) {
            this.log.info("Task state store does not exist.");
            return;
        }
        StateStore stateStore = (StateStore) this.taskStateStoreOptional.get();
        Iterator<Task> it = this.tasks.iterator();
        while (it.hasNext()) {
            String taskId = it.next().getTaskId();
            if (stateStore.exists(this.jobId, taskId + AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)) {
                stateStore.delete(this.jobId, taskId + AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX);
            }
        }
        boolean z = false;
        for (Task task : this.tasks) {
            this.log.info("Writing task state for task " + task.getTaskId());
            stateStore.put(task.getJobId(), task.getTaskId() + AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX, task.getTaskState());
            if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.FAILED) {
                z = true;
            }
        }
        if (z) {
            for (Task task2 : this.tasks) {
                if (task2.getTaskState().contains("task.failure.exception")) {
                    this.log.error(String.format("Task %s failed due to exception: %s", task2.getTaskId(), task2.getTaskState().getProp("task.failure.exception")));
                }
            }
            throw new IOException(String.format("Not all tasks running in container %s completed successfully", this.containerIdOptional.or("")));
        }
    }

    public boolean isSpeculativeExecutionSafe() {
        Iterator<Task> it = this.tasks.iterator();
        while (it.hasNext()) {
            if (!it.next().isSpeculativeExecutionSafe()) {
                this.log.info("One task is not safe for speculative execution.");
                return false;
            }
        }
        this.log.info("All tasks are safe for speculative execution.");
        return true;
    }

    private final int getTaskCommitThreadPoolSize() {
        return Integer.parseInt(this.jobState.getProp("taskexecutor.threadpool.size", Integer.toString(2)));
    }

    public void addCleanupCommitStep(CommitStep commitStep) {
        if (this.cleanupCommitSteps == null) {
            this.cleanupCommitSteps = Lists.newArrayList(new CommitStep[]{commitStep});
        } else {
            this.cleanupCommitSteps.add(commitStep);
        }
    }

    private List<Task> runWorkUnits(CountUpAndDownLatch countUpAndDownLatch) {
        ArrayList newArrayList = Lists.newArrayList();
        while (this.workUnits.hasNext()) {
            WorkUnit next = this.workUnits.next();
            countUpAndDownLatch.countUp();
            String prop = next.getProp("task.id");
            WorkUnitState workUnitState = new WorkUnitState(next, this.jobState, this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(prop)));
            workUnitState.setId(prop);
            workUnitState.setProp("job.id", this.jobId);
            workUnitState.setProp("task.id", prop);
            if (this.containerIdOptional.isPresent()) {
                workUnitState.setProp("task.AttemptId", this.containerIdOptional.get());
            }
            Task createTaskRunnable = createTaskRunnable(workUnitState, countUpAndDownLatch);
            this.taskStateTracker.registerNewTask(createTaskRunnable);
            newArrayList.add(createTaskRunnable);
            this.taskExecutor.execute(createTaskRunnable);
        }
        new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build().submit("TasksSubmitted", new String[]{"tasksCount", Long.toString(countUpAndDownLatch.getRegisteredParties())});
        return newArrayList;
    }

    private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
        Optional<TaskFactory> taskFactory = TaskUtils.getTaskFactory(workUnitState);
        return taskFactory.isPresent() ? new TaskIFaceWrapper(((TaskFactory) taskFactory.get()).createTask(new TaskContext(workUnitState)), new TaskContext(workUnitState), countDownLatch, this.taskStateTracker) : new Task(new TaskContext(workUnitState), this.taskStateTracker, this.taskExecutor, Optional.of(countDownLatch));
    }

    public void runAndOptionallyCommitTaskAttempt(CommitPolicy commitPolicy) throws IOException, InterruptedException {
        run();
        if (commitPolicy.equals(CommitPolicy.IMMEDIATE)) {
            this.log.info("Will commit tasks directly.");
            commit();
        } else if (!isSpeculativeExecutionSafe()) {
            throw new RuntimeException("Specualtive execution is enabled. However, the task context is not safe for speculative execution.");
        }
    }

    public static GobblinMultiTaskAttempt runWorkUnits(JobContext jobContext, Iterator<WorkUnit> it, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, CommitPolicy commitPolicy) throws IOException, InterruptedException {
        GobblinMultiTaskAttempt gobblinMultiTaskAttempt = new GobblinMultiTaskAttempt(it, jobContext.getJobId(), jobContext.getJobState(), taskStateTracker, taskExecutor, Optional.absent(), Optional.absent(), jobContext.getJobBroker());
        gobblinMultiTaskAttempt.runAndOptionallyCommitTaskAttempt(commitPolicy);
        return gobblinMultiTaskAttempt;
    }

    public static GobblinMultiTaskAttempt runWorkUnits(String str, String str2, JobState jobState, List<WorkUnit> list, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, StateStore<TaskState> stateStore, CommitPolicy commitPolicy, SharedResourcesBroker<GobblinScopeTypes> sharedResourcesBroker) throws IOException, InterruptedException {
        GobblinMultiTaskAttempt gobblinMultiTaskAttempt = new GobblinMultiTaskAttempt(list.iterator(), str, jobState, taskStateTracker, taskExecutor, Optional.of(str2), Optional.of(stateStore), sharedResourcesBroker);
        gobblinMultiTaskAttempt.runAndOptionallyCommitTaskAttempt(commitPolicy);
        return gobblinMultiTaskAttempt;
    }
}
