package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
import com.google.inject.Guice;
import com.google.inject.Module;
import gobblin.commit.CommitSequence;
import gobblin.commit.CommitSequenceStore;
import gobblin.commit.CommitStep;
import gobblin.commit.DeliverySemantics;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.metastore.JobHistoryStore;
import gobblin.metastore.MetaStoreModule;
import gobblin.metrics.GobblinMetrics;
import gobblin.publisher.CommitSequencePublisher;
import gobblin.publisher.DataPublisher;
import gobblin.publisher.UnpublishedHandling;
import gobblin.runtime.JobState;
import gobblin.runtime.commit.DatasetStateCommitStep;
import gobblin.runtime.commit.FsCommitSequenceStore;
import gobblin.runtime.util.JobMetrics;
import gobblin.source.Source;
import gobblin.source.extractor.JobCommitPolicy;
import gobblin.util.HadoopUtils;
import gobblin.util.JobLauncherUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/JobContext.class */
public class JobContext {
    private static final Logger LOG = LoggerFactory.getLogger(JobContext.class);
    private static final String TASK_STAGING_DIR_NAME = "task-staging";
    private static final String TASK_OUTPUT_DIR_NAME = "task-output";
    private final String jobName;
    private final String jobId;
    private final JobState jobState;
    private final JobCommitPolicy jobCommitPolicy;
    private final boolean jobLockEnabled;
    private final Optional<JobMetrics> jobMetricsOptional;
    private final Source<?, ?> source;
    private final FsDatasetStateStore datasetStateStore;
    private final Optional<JobHistoryStore> jobHistoryStoreOptional;
    private final DeliverySemantics semantics;
    private final Optional<CommitSequenceStore> commitSequenceStore;
    private final Logger logger;
    private Optional<Map<String, JobState.DatasetState>> datasetStatesByUrns = Optional.absent();

    public JobContext(Properties properties, Logger logger) throws Exception {
        Preconditions.checkArgument(properties.containsKey("job.name"), "A job must have a job name specified by job.name");
        this.jobName = properties.getProperty("job.name");
        this.jobId = properties.containsKey("job.id") ? properties.getProperty("job.id") : JobLauncherUtils.newJobId(this.jobName);
        properties.setProperty("job.id", this.jobId);
        this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy(properties);
        this.jobLockEnabled = Boolean.valueOf(properties.getProperty("job.lock.enabled", Boolean.TRUE.toString())).booleanValue();
        Configuration configuration = new Configuration();
        for (String str : properties.stringPropertyNames()) {
            configuration.set(str, properties.getProperty(str));
        }
        FileSystem fileSystem = FileSystem.get(URI.create(properties.getProperty("state.store.fs.uri", "file:///")), configuration);
        String property = properties.getProperty("state.store.dir");
        if (!properties.containsKey("state.store.enabled") || Boolean.parseBoolean(properties.getProperty("state.store.enabled"))) {
            this.datasetStateStore = new FsDatasetStateStore(fileSystem, property);
        } else {
            this.datasetStateStore = new NoopDatasetStateStore(fileSystem, property);
        }
        if (Boolean.valueOf(properties.getProperty("job.history.store.enabled", Boolean.FALSE.toString())).booleanValue()) {
            this.jobHistoryStoreOptional = Optional.of(Guice.createInjector(new Module[]{new MetaStoreModule(properties)}).getInstance(JobHistoryStore.class));
        } else {
            this.jobHistoryStoreOptional = Optional.absent();
        }
        State state = new State();
        state.addAll(properties);
        this.jobState = new JobState(state, this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName, this.jobId);
        setTaskStagingAndOutputDirs();
        if (GobblinMetrics.isEnabled(properties)) {
            this.jobMetricsOptional = Optional.of(JobMetrics.get(this.jobState));
            this.jobState.setProp("metrics.context.name", ((JobMetrics) this.jobMetricsOptional.get()).getName());
        } else {
            this.jobMetricsOptional = Optional.absent();
        }
        this.semantics = DeliverySemantics.parse(this.jobState);
        this.commitSequenceStore = createCommitSequenceStore();
        this.source = new SourceDecorator((Source) Source.class.cast(Class.forName(properties.getProperty("source.class")).newInstance()), this.jobId, logger);
        this.logger = logger;
    }

    private Optional<CommitSequenceStore> createCommitSequenceStore() throws IOException {
        if (this.semantics != DeliverySemantics.EXACTLY_ONCE) {
            return Optional.absent();
        }
        Preconditions.checkState(this.jobState.contains(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR));
        FileSystem fileSystem = FileSystem.get(URI.create(this.jobState.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_FS_URI, "file:///")), HadoopUtils.getConfFromState(this.jobState));
        Throwable th = null;
        try {
            Optional<CommitSequenceStore> of = Optional.of(new FsCommitSequenceStore(fileSystem, new Path(this.jobState.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR))));
            if (fileSystem != null) {
                if (0 != 0) {
                    try {
                        fileSystem.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileSystem.close();
                }
            }
            return of;
        } catch (Throwable th3) {
            if (fileSystem != null) {
                if (0 != 0) {
                    try {
                        fileSystem.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileSystem.close();
                }
            }
            throw th3;
        }
    }

    public String getJobName() {
        return this.jobName;
    }

    public String getJobId() {
        return this.jobId;
    }

    public JobState getJobState() {
        return this.jobState;
    }

    public Optional<JobMetrics> getJobMetricsOptional() {
        return this.jobMetricsOptional;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Source<?, ?> getSource() {
        return this.source;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isJobLockEnabled() {
        return this.jobLockEnabled;
    }

    private void setTaskStagingAndOutputDirs() {
        if (!this.jobState.contains("task.data.root.dir")) {
            LOG.warn("Property task.data.root.dir is missing.");
            return;
        }
        this.jobState.setProp("task.data.root.dir", new Path(this.jobState.getProp("task.data.root.dir"), this.jobId).toString());
        setTaskStagingDir();
        setTaskOutputDir();
    }

    private void setTaskStagingDir() {
        if (this.jobState.contains("writer.staging.dir")) {
            LOG.warn(String.format("Property %s is deprecated. No need to use it if %s is specified.", "writer.staging.dir", "task.data.root.dir"));
        } else {
            this.jobState.setProp("writer.staging.dir", new Path(this.jobState.getProp("task.data.root.dir"), TASK_STAGING_DIR_NAME).toString());
        }
    }

    private void setTaskOutputDir() {
        if (this.jobState.contains("writer.output.dir")) {
            LOG.warn(String.format("Property %s is deprecated. No need to use it if %s is specified.", "writer.output.dir", "task.data.root.dir"));
        } else {
            this.jobState.setProp("writer.output.dir", new Path(this.jobState.getProp("task.data.root.dir"), TASK_OUTPUT_DIR_NAME).toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldCleanupStagingDataPerTask() {
        return this.jobState.getPropAsBoolean("cleanup.staging.data.per.task", true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, JobState.DatasetState> getDatasetStatesByUrns() {
        return ImmutableMap.copyOf((Map) this.datasetStatesByUrns.or(Maps.newHashMap()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void storeJobExecutionInfo() {
        if (this.jobHistoryStoreOptional.isPresent()) {
            try {
                this.logger.info("Writing job execution information to the job history store");
                ((JobHistoryStore) this.jobHistoryStoreOptional.get()).put(this.jobState.toJobExecutionInfo());
            } catch (IOException e) {
                this.logger.error("Failed to write job execution information to the job history store: " + e, e);
            }
        }
    }

    @Subscribe
    public void handleNewTaskCompletionEvent(NewTaskCompletionEvent newTaskCompletionEvent) {
        LOG.info("{} more tasks of job {} have completed", Integer.valueOf(newTaskCompletionEvent.getTaskStates().size()), this.jobId);
        storeJobExecutionInfo();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finalizeJobStateBeforeCommit() {
        this.jobState.setEndTime(System.currentTimeMillis());
        this.jobState.setDuration(this.jobState.getEndTime() - this.jobState.getStartTime());
        Iterator<TaskState> it = this.jobState.getTaskStates().iterator();
        while (it.hasNext()) {
            this.jobState.setProp("fork.branches", Integer.valueOf(it.next().getPropAsInt("fork.branches", 1)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commit() throws IOException {
        this.datasetStatesByUrns = Optional.of(this.jobState.createDatasetStatesByUrns());
        boolean z = true;
        boolean shouldCommitDataInJob = shouldCommitDataInJob(this.jobState);
        DeliverySemantics parse = DeliverySemantics.parse(this.jobState);
        if (!shouldCommitDataInJob) {
            this.logger.info("Job will not commit data since data are committed by tasks.");
        }
        for (Map.Entry entry : ((Map) this.datasetStatesByUrns.get()).entrySet()) {
            z &= processDatasetCommit(shouldCommitDataInJob, parse, (String) entry.getKey(), (JobState.DatasetState) entry.getValue());
        }
        if (!z) {
            this.jobState.setState(JobState.RunningState.FAILED);
            throw new IOException("Failed to commit dataset state for some dataset(s) of job " + this.jobId);
        }
        this.jobState.setState(JobState.RunningState.COMMITTED);
    }

    boolean processDatasetCommit(boolean z, DeliverySemantics deliverySemantics, String str, JobState.DatasetState datasetState) throws IOException {
        Throwable th;
        Throwable th2;
        boolean z2 = true;
        finalizeDatasetStateBeforeCommit(datasetState);
        Class<? extends DataPublisher> cls = null;
        try {
            Closer create = Closer.create();
            th = null;
            try {
                try {
                    cls = (Class) getJobDataPublisherClass(this.jobState).or(Class.forName("gobblin.publisher.BaseDataPublisher"));
                    if (!canCommitDataset(datasetState)) {
                        this.logger.warn(String.format("Not committing dataset %s of job %s with commit policy %s and state %s", str, this.jobId, this.jobCommitPolicy, datasetState.getState()));
                        checkForUnpublishedWUHandling(str, datasetState, cls, create);
                        z2 = false;
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th3) {
                                th2 = th3;
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                    th2 = th;
                }
            } finally {
            }
        } catch (ReflectiveOperationException e) {
            this.logger.warn("Unable to find publisher class: " + e, e);
            z2 = false;
        }
        if (!z2) {
            return false;
        }
        Optional absent = Optional.absent();
        try {
            try {
                try {
                    th = Closer.create();
                    Throwable th4 = null;
                    try {
                        if (z) {
                            this.logger.info(String.format("Committing dataset %s of job %s with commit policy %s and state %s", str, this.jobId, this.jobCommitPolicy, datasetState.getState()));
                            if (deliverySemantics == DeliverySemantics.EXACTLY_ONCE) {
                                generateCommitSequenceBuilder(datasetState);
                            } else {
                                commitDataset(datasetState, th.register(DataPublisher.getInstance(cls, this.jobState)));
                            }
                        } else if (datasetState.getState() == JobState.RunningState.SUCCESSFUL) {
                            datasetState.setState(JobState.RunningState.COMMITTED);
                        }
                        if (th != null) {
                            if (0 != 0) {
                                try {
                                    th.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            } else {
                                th.close();
                            }
                        }
                        try {
                            finalizeDatasetState(datasetState, str);
                            if (absent.isPresent()) {
                                buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), datasetState, str);
                                datasetState.setState(JobState.RunningState.COMMITTED);
                            } else {
                                persistDatasetState(str, datasetState);
                            }
                        } catch (IOException | RuntimeException e2) {
                            this.logger.error(String.format("Failed to persist dataset state for dataset %s of job %s", str, this.jobId), e2);
                            z2 = false;
                        }
                    } finally {
                    }
                } catch (Throwable th6) {
                    try {
                        finalizeDatasetState(datasetState, str);
                        if (absent.isPresent()) {
                            buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), datasetState, str);
                            datasetState.setState(JobState.RunningState.COMMITTED);
                        } else {
                            persistDatasetState(str, datasetState);
                        }
                    } catch (IOException | RuntimeException e3) {
                        this.logger.error(String.format("Failed to persist dataset state for dataset %s of job %s", str, this.jobId), e3);
                    }
                    throw th6;
                }
            } catch (Throwable th7) {
                if (th != null) {
                    if (th2 != null) {
                        try {
                            th.close();
                        } catch (Throwable th8) {
                            th2.addSuppressed(th8);
                        }
                    } else {
                        th.close();
                    }
                }
                throw th7;
            }
        } catch (IOException e4) {
            this.logger.error(String.format("Failed to commit dataset state for dataset %s of job %s", str, this.jobId), e4);
            z2 = false;
            try {
                finalizeDatasetState(datasetState, str);
                if (absent.isPresent()) {
                    buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), datasetState, str);
                    datasetState.setState(JobState.RunningState.COMMITTED);
                } else {
                    persistDatasetState(str, datasetState);
                }
            } catch (IOException | RuntimeException e5) {
                this.logger.error(String.format("Failed to persist dataset state for dataset %s of job %s", str, this.jobId), e5);
                z2 = false;
            }
        } catch (ReflectiveOperationException e6) {
            this.logger.error(String.format("Failed to instantiate data publisher for dataset %s of job %s.", str, this.jobId), e6);
            z2 = false;
            try {
                finalizeDatasetState(datasetState, str);
                if (absent.isPresent()) {
                    buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), datasetState, str);
                    datasetState.setState(JobState.RunningState.COMMITTED);
                } else {
                    persistDatasetState(str, datasetState);
                }
            } catch (IOException | RuntimeException e7) {
                this.logger.error(String.format("Failed to persist dataset state for dataset %s of job %s", str, this.jobId), e7);
                z2 = false;
            }
        }
        return z2;
    }

    void checkForUnpublishedWUHandling(String str, JobState.DatasetState datasetState, Class<? extends DataPublisher> cls, Closer closer) throws ReflectiveOperationException, IOException {
        if (UnpublishedHandling.class.isAssignableFrom(cls)) {
            UnpublishedHandling unpublishedHandling = (DataPublisher) closer.register(DataPublisher.getInstance(cls, datasetState));
            this.logger.info(String.format("Calling publisher to handle unpublished work units for dataset %s of job %s.", str, this.jobId));
            unpublishedHandling.handleUnpublishedWorkUnits(datasetState.getTaskStatesAsWorkUnitStates());
        }
    }

    private static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(State state) throws ReflectiveOperationException {
        if (!Strings.isNullOrEmpty(state.getProp("data.publisher.job.type"))) {
            return Optional.of(Class.forName(state.getProp("data.publisher.job.type")));
        }
        if (!Strings.isNullOrEmpty(state.getProp("data.publisher.type"))) {
            return Optional.of(Class.forName(state.getProp("data.publisher.type")));
        }
        LOG.info("Property data.publisher.job.type not specified");
        return Optional.absent();
    }

    public static boolean shouldCommitDataInJob(State state) {
        return (JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) || state.getPropAsBoolean("publish.data.at.job.level", true) || (!Strings.isNullOrEmpty(state.getProp("data.publisher.job.type")));
    }

    private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState) {
        Iterator<TaskState> it = datasetState.getTaskStates().iterator();
        while (it.hasNext()) {
            if (it.next().getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
                datasetState.setState(JobState.RunningState.FAILED);
                datasetState.incrementJobFailures();
                return;
            }
        }
        datasetState.setState(JobState.RunningState.SUCCESSFUL);
        datasetState.setNoJobFailure();
    }

    private boolean canCommitDataset(JobState.DatasetState datasetState) {
        return this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS || this.jobCommitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || (this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS && datasetState.getState() == JobState.RunningState.SUCCESSFUL);
    }

    private static void commitDataset(JobState.DatasetState datasetState, DataPublisher dataPublisher) {
        try {
            dataPublisher.publish(datasetState.getTaskStates());
        } catch (Throwable th) {
            LOG.error("Failed to commit dataset", th);
            setTaskFailureException(datasetState.getTaskStates(), th);
        }
        datasetState.setState(JobState.RunningState.COMMITTED);
    }

    private Optional<CommitSequence.Builder> generateCommitSequenceBuilder(JobState.DatasetState datasetState) {
        try {
            Closer create = Closer.create();
            Throwable th = null;
            try {
                try {
                    CommitSequencePublisher register = create.register(DataPublisher.getInstance(Class.forName(datasetState.getProp("data.publisher.type", "gobblin.publisher.BaseDataPublisher")), this.jobState));
                    register.publish(datasetState.getTaskStates());
                    Optional<CommitSequence.Builder> commitSequenceBuilder = register.getCommitSequenceBuilder();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return commitSequenceBuilder;
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            LOG.error("Failed to generate commit sequence", th3);
            setTaskFailureException(datasetState.getTaskStates(), th3);
            throw Throwables.propagate(th3);
        }
    }

    private void buildAndExecuteCommitSequence(CommitSequence.Builder builder, JobState.DatasetState datasetState, String str) throws IOException {
        CommitSequence build = builder.addStep((CommitStep) buildDatasetStateCommitStep(str, datasetState).get()).build();
        ((CommitSequenceStore) this.commitSequenceStore.get()).put(build.getJobName(), str, build);
        build.execute();
        ((CommitSequenceStore) this.commitSequenceStore.get()).delete(build.getJobName(), str);
    }

    private void persistDatasetState(String str, JobState.DatasetState datasetState) throws IOException {
        LOG.info("Persisting dataset state for dataset " + str);
        this.datasetStateStore.persistDatasetState(str, datasetState);
    }

    private static Optional<CommitStep> buildDatasetStateCommitStep(String str, JobState.DatasetState datasetState) {
        LOG.info("Creating " + DatasetStateCommitStep.class.getSimpleName() + " for dataset " + str);
        return Optional.of(((DatasetStateCommitStep.Builder) new DatasetStateCommitStep.Builder().withProps(datasetState)).withDatasetUrn(str).withDatasetState(datasetState).build());
    }

    private void finalizeDatasetState(JobState.DatasetState datasetState, String str) {
        for (TaskState taskState : datasetState.getTaskStates()) {
            if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
                taskState.backoffActualHighWatermark();
                if (this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
                    datasetState.setState(JobState.RunningState.FAILED);
                }
            }
        }
        datasetState.setId(str);
    }

    private static void setTaskFailureException(Collection<TaskState> collection, Throwable th) {
        Iterator<TaskState> it = collection.iterator();
        while (it.hasNext()) {
            it.next().setTaskFailureException(th);
        }
    }

    public DeliverySemantics getSemantics() {
        return this.semantics;
    }

    public Optional<CommitSequenceStore> getCommitSequenceStore() {
        return this.commitSequenceStore;
    }
}
