package org.apache.gobblin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/cluster/SingleTaskRunner.class */
class SingleTaskRunner {
    private static final Logger logger = LoggerFactory.getLogger(SingleTaskRunner.class);
    protected final String jobId;
    protected final String workUnitFilePath;
    protected final Config clusterConfig;
    private final Path appWorkPath;

    @VisibleForTesting
    SingleTask task;
    private TaskExecutor taskExecutor;
    private GobblinHelixTaskStateTracker taskStateTracker;
    private ServiceManager serviceManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleTaskRunner(String str, String str2, String str3) {
        this.jobId = str2;
        this.workUnitFilePath = str3;
        this.clusterConfig = ConfigFactory.parseFile(new File(str));
        this.appWorkPath = new Path(this.clusterConfig.getString(GobblinTaskRunner.CLUSTER_APP_WORK_DIR));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void run() throws IOException, InterruptedException {
        run(false);
    }

    void run(boolean z) throws IOException, InterruptedException {
        logger.info("SingleTaskRunner running.");
        startServices();
        runTask(z);
        shutdownServices();
    }

    @VisibleForTesting
    void startServices() {
        logger.info("SingleTaskRunner start services.");
        initServices();
        this.serviceManager.startAsync();
        try {
            this.serviceManager.awaitHealthy(10L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw new GobblinClusterException("Timeout waiting for services to start.", e);
        }
    }

    private void shutdownServices() {
        logger.info("SingleTaskRunner shutting down services.");
        this.serviceManager.stopAsync();
        try {
            this.serviceManager.awaitStopped(1L, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            logger.error("Timeout waiting for services to shutdown.", e);
        }
    }

    private void runTask(boolean z) throws IOException, InterruptedException {
        logger.info("SingleTaskRunner running task.");
        initClusterSingleTask(z);
        this.task.run();
    }

    void initClusterSingleTask(boolean z) throws IOException {
        FileSystem fileSystem = getFileSystem();
        StateStores stateStores = new StateStores(this.clusterConfig, this.appWorkPath, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, this.appWorkPath, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, this.appWorkPath, GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
        this.task = createSingleTaskHelper(getTaskAttemptBuilder(stateStores), fileSystem, stateStores, GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), this.appWorkPath, this.jobId), z);
    }

    protected SingleTask createSingleTaskHelper(TaskAttemptBuilder taskAttemptBuilder, FileSystem fileSystem, StateStores stateStores, Path path, boolean z) throws IOException {
        return new SingleTask(this.jobId, new Path(this.workUnitFilePath), path, fileSystem, taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
    }

    private TaskAttemptBuilder getTaskAttemptBuilder(StateStores stateStores) {
        TaskAttemptBuilder taskAttemptBuilder = new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor);
        taskAttemptBuilder.setTaskStateStore(stateStores.getTaskStateStore());
        return taskAttemptBuilder;
    }

    private void initServices() {
        Properties configToProperties = ConfigUtils.configToProperties(this.clusterConfig);
        this.taskExecutor = new TaskExecutor(configToProperties);
        this.taskStateTracker = new GobblinHelixTaskStateTracker(configToProperties);
        this.serviceManager = new ServiceManager(Lists.newArrayList(new Service[]{this.taskExecutor, this.taskStateTracker}));
    }

    private FileSystem getFileSystem() throws IOException {
        Configuration newConfiguration = HadoopUtils.newConfiguration();
        return this.clusterConfig.hasPath("fs.uri") ? FileSystem.get(URI.create(this.clusterConfig.getString("fs.uri")), newConfiguration) : FileSystem.get(newConfiguration);
    }
}
