package gobblin.runtime;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.AbstractScheduledService;
import gobblin.util.ParallelRunner;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/TaskStateCollectorService.class */
public class TaskStateCollectorService extends AbstractScheduledService {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);
    private final JobState jobState;
    private final FileSystem fs;
    private final EventBus eventBus;
    private final int stateSerDeRunnerThreads;
    private final int outputTaskStatesCollectorIntervalSeconds;
    private final Path outputTaskStateDir;

    public TaskStateCollectorService(Properties properties, JobState jobState, EventBus eventBus, FileSystem fileSystem, Path path) {
        this.jobState = jobState;
        this.eventBus = eventBus;
        this.fs = fileSystem;
        this.outputTaskStateDir = path;
        this.stateSerDeRunnerThreads = Integer.parseInt(properties.getProperty("parallel.runner.threads", Integer.toString(10)));
        this.outputTaskStatesCollectorIntervalSeconds = Integer.parseInt(properties.getProperty("task.state.collector.interval.secs", Integer.toString(60)));
    }

    protected void runOneIteration() throws Exception {
        collectOutputTaskStates();
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule(this.outputTaskStatesCollectorIntervalSeconds, this.outputTaskStatesCollectorIntervalSeconds, TimeUnit.SECONDS);
    }

    protected void startUp() throws Exception {
        LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
        super.startUp();
    }

    protected void shutDown() throws Exception {
        LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
        try {
            runOneIteration();
        } finally {
            super.shutDown();
        }
    }

    private void collectOutputTaskStates() throws IOException {
        if (!this.fs.exists(this.outputTaskStateDir)) {
            LOGGER.warn(String.format("Output task state path %s does not exist", this.outputTaskStateDir));
            return;
        }
        FileStatus[] listStatus = this.fs.listStatus(this.outputTaskStateDir, new PathFilter() { // from class: gobblin.runtime.TaskStateCollectorService.1
            public boolean accept(Path path) {
                return path.getName().endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX);
            }
        });
        if (listStatus == null || listStatus.length == 0) {
            LOGGER.warn("No output task state files found in " + this.outputTaskStateDir);
            return;
        }
        ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
        ParallelRunner parallelRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs);
        Throwable th = null;
        try {
            try {
                for (FileStatus fileStatus : listStatus) {
                    LOGGER.info("Found output task state file " + fileStatus.getPath());
                    parallelRunner.deserializeFromSequenceFile(Text.class, TaskState.class, fileStatus.getPath(), newConcurrentLinkedQueue, true);
                }
                if (parallelRunner != null) {
                    if (0 != 0) {
                        try {
                            parallelRunner.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        parallelRunner.close();
                    }
                }
                LOGGER.info(String.format("Collected task state of %d completed tasks", Integer.valueOf(newConcurrentLinkedQueue.size())));
                Iterator it = newConcurrentLinkedQueue.iterator();
                while (it.hasNext()) {
                    this.jobState.addTaskState((TaskState) it.next());
                }
                this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(newConcurrentLinkedQueue)));
            } finally {
            }
        } catch (Throwable th3) {
            if (parallelRunner != null) {
                if (th != null) {
                    try {
                        parallelRunner.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    parallelRunner.close();
                }
            }
            throw th3;
        }
    }
}
