package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.io.Closer;
import gobblin.commit.CommitSequence;
import gobblin.commit.CommitStep;
import gobblin.commit.DeliverySemantics;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.WorkUnitState;
import gobblin.publisher.CommitSequencePublisher;
import gobblin.publisher.DataPublisher;
import gobblin.publisher.UnpublishedHandling;
import gobblin.runtime.JobState;
import gobblin.runtime.commit.DatasetStateCommitStep;
import gobblin.runtime.task.TaskFactory;
import gobblin.runtime.task.TaskUtils;
import gobblin.source.extractor.JobCommitPolicy;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/SafeDatasetCommit.class */
public final class SafeDatasetCommit implements Callable<Void> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SafeDatasetCommit.class);
    private static final Object GLOBAL_LOCK = new Object();
    private final boolean shouldCommitDataInJob;
    private final boolean isJobCancelled;
    private final DeliverySemantics deliverySemantics;
    private final String datasetUrn;
    private final JobState.DatasetState datasetState;
    private final boolean isMultithreaded;
    private final JobContext jobContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/SafeDatasetCommit$TaskFactoryWrapper.class */
    public static class TaskFactoryWrapper {
        private final TaskFactory taskFactory;

        public boolean equals(Object obj) {
            if (obj instanceof TaskFactoryWrapper) {
                return this.taskFactory == null ? ((TaskFactoryWrapper) obj).taskFactory == null : ((TaskFactoryWrapper) obj).taskFactory != null && this.taskFactory.getClass().equals(((TaskFactoryWrapper) obj).taskFactory.getClass());
            }
            return false;
        }

        public int hashCode() {
            Class<?> cls = this.taskFactory == null ? null : this.taskFactory.getClass();
            return (1 * 59) + (cls == null ? 43 : cls.hashCode());
        }

        @ConstructorProperties({"taskFactory"})
        public TaskFactoryWrapper(TaskFactory taskFactory) {
            this.taskFactory = taskFactory;
        }

        public TaskFactory getTaskFactory() {
            return this.taskFactory;
        }

        public String toString() {
            return "SafeDatasetCommit.TaskFactoryWrapper(taskFactory=" + getTaskFactory() + ")";
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Failed to calculate best type for var: r13v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0346: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:118:0x0346 */
    /* JADX WARN: Type inference failed for: r11v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r11v1 */
    /* JADX WARN: Type inference failed for: r11v2 */
    /* JADX WARN: Type inference failed for: r12v6, types: [java.lang.Throwable, com.google.common.io.Closer] */
    /* JADX WARN: Type inference failed for: r13v1, types: [java.lang.Throwable] */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        ?? r13;
        if (this.datasetState.getState() == JobState.RunningState.COMMITTED) {
            log.info(this.datasetUrn + " have been committed.");
            return null;
        }
        finalizeDatasetStateBeforeCommit(this.datasetState);
        try {
            Closer create = Closer.create();
            ?? r11 = 0;
            try {
                try {
                    Class<? extends DataPublisher> cls = (Class) JobContext.getJobDataPublisherClass(this.jobContext.getJobState()).or((Optional) Class.forName(ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE));
                    if (!canCommitDataset(this.datasetState)) {
                        log.warn(String.format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn, this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
                        checkForUnpublishedWUHandling(this.datasetUrn, this.datasetState, cls, create);
                        throw new RuntimeException(String.format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn, this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th) {
                                r11.addSuppressed(th);
                            }
                        } else {
                            create.close();
                        }
                    }
                    if (this.isJobCancelled) {
                        log.info("Executing commit steps although job is cancelled due to job commit policy: " + this.jobContext.getJobCommitPolicy());
                    }
                    Optional absent = Optional.absent();
                    try {
                        try {
                            try {
                                Closer create2 = Closer.create();
                                Throwable th2 = null;
                                if (this.shouldCommitDataInJob) {
                                    log.info(String.format("Committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn, this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
                                    for (Map.Entry<TaskFactoryWrapper, Collection<TaskState>> entry : groupByTaskFactory(this.datasetState).asMap().entrySet()) {
                                        TaskFactory taskFactory = entry.getKey().getTaskFactory();
                                        if (this.deliverySemantics != DeliverySemantics.EXACTLY_ONCE) {
                                            DataPublisher createDataPublisher = taskFactory == null ? (DataPublisher) create2.register(DataPublisher.getInstance(cls, this.jobContext.getJobState())) : taskFactory.createDataPublisher(this.datasetState);
                                            if (this.isJobCancelled) {
                                                if (!createDataPublisher.canBeSkipped()) {
                                                    throw new RuntimeException("Cannot persist state upon cancellation because publisher has unfinished work and cannot be skipped.");
                                                }
                                                log.warn(createDataPublisher.getClass() + " will be skipped.");
                                            } else if (!this.isMultithreaded || createDataPublisher.isThreadSafe()) {
                                                commitDataset(entry.getValue(), createDataPublisher);
                                            } else {
                                                log.warn(String.format("Gobblin is set up to parallelize publishing, however the publisher %s is not thread-safe. Falling back to serial publishing.", createDataPublisher.getClass().getName()));
                                                safeCommitDataset(entry.getValue(), createDataPublisher);
                                            }
                                        } else {
                                            if (taskFactory != null) {
                                                throw new RuntimeException("Custom task factories do not support exactly once delivery semantics.");
                                            }
                                            generateCommitSequenceBuilder(this.datasetState, entry.getValue());
                                        }
                                    }
                                    this.datasetState.setState(JobState.RunningState.COMMITTED);
                                } else if (this.datasetState.getState() == JobState.RunningState.SUCCESSFUL) {
                                    this.datasetState.setState(JobState.RunningState.COMMITTED);
                                }
                                if (create2 != null) {
                                    if (0 != 0) {
                                        try {
                                            create2.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        create2.close();
                                    }
                                }
                                try {
                                    finalizeDatasetState(this.datasetState, this.datasetUrn);
                                    if (absent.isPresent()) {
                                        buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), this.datasetState, this.datasetUrn);
                                        this.datasetState.setState(JobState.RunningState.COMMITTED);
                                    } else if (1 != 0) {
                                        persistDatasetState(this.datasetUrn, this.datasetState);
                                    }
                                    return null;
                                } catch (IOException | RuntimeException e) {
                                    log.error(String.format("Failed to persist dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), e);
                                    throw new RuntimeException(e);
                                }
                            } catch (Throwable th4) {
                                try {
                                    finalizeDatasetState(this.datasetState, this.datasetUrn);
                                    if (absent.isPresent()) {
                                        buildAndExecuteCommitSequence((CommitSequence.Builder) absent.get(), this.datasetState, this.datasetUrn);
                                        this.datasetState.setState(JobState.RunningState.COMMITTED);
                                    } else if (1 != 0) {
                                        persistDatasetState(this.datasetUrn, this.datasetState);
                                    }
                                    throw th4;
                                } catch (IOException | RuntimeException e2) {
                                    log.error(String.format("Failed to persist dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), e2);
                                    throw new RuntimeException(e2);
                                }
                            }
                        } catch (Throwable th5) {
                            if (th != 0) {
                                if (r13 != 0) {
                                    try {
                                        th.close();
                                    } catch (Throwable th6) {
                                        r13.addSuppressed(th6);
                                    }
                                } else {
                                    th.close();
                                }
                            }
                            throw th5;
                        }
                    } catch (ReflectiveOperationException e3) {
                        log.error(String.format("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn, this.jobContext.getJobId()), (Throwable) e3);
                        throw new RuntimeException(e3);
                    } catch (Throwable th7) {
                        log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), th7);
                        throw new RuntimeException(th7);
                    }
                } finally {
                }
            } finally {
            }
        } catch (ReflectiveOperationException e4) {
            log.error("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn, this.jobContext.getJobId(), e4);
            throw new RuntimeException(e4);
        }
    }

    private void safeCommitDataset(Collection<TaskState> collection, DataPublisher dataPublisher) {
        synchronized (GLOBAL_LOCK) {
            commitDataset(collection, dataPublisher);
        }
    }

    private void commitDataset(Collection<TaskState> collection, DataPublisher dataPublisher) {
        try {
            dataPublisher.publish(collection);
        } catch (Throwable th) {
            log.error("Failed to commit dataset", th);
            setTaskFailureException(collection, th);
        }
    }

    private ListMultimap<TaskFactoryWrapper, TaskState> groupByTaskFactory(JobState.DatasetState datasetState) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (TaskState taskState : datasetState.getTaskStates()) {
            create.put(new TaskFactoryWrapper(TaskUtils.getTaskFactory(taskState).orNull()), taskState);
        }
        return create;
    }

    private synchronized void buildAndExecuteCommitSequence(CommitSequence.Builder builder, JobState.DatasetState datasetState, String str) throws IOException {
        CommitSequence build = builder.addStep(buildDatasetStateCommitStep(str, datasetState).get()).build();
        this.jobContext.getCommitSequenceStore().get().put(build.getJobName(), str, build);
        build.execute();
        this.jobContext.getCommitSequenceStore().get().delete(build.getJobName(), str);
    }

    private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState) {
        Iterator<TaskState> it = datasetState.getTaskStates().iterator();
        while (it.hasNext()) {
            if (it.next().getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
                datasetState.setState(JobState.RunningState.FAILED);
                datasetState.incrementJobFailures();
                return;
            }
        }
        datasetState.setState(JobState.RunningState.SUCCESSFUL);
        datasetState.setNoJobFailure();
    }

    private boolean canCommitDataset(JobState.DatasetState datasetState) {
        return this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS || this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS && datasetState.getState() == JobState.RunningState.SUCCESSFUL);
    }

    private Optional<CommitSequence.Builder> generateCommitSequenceBuilder(JobState.DatasetState datasetState, Collection<TaskState> collection) {
        try {
            Closer create = Closer.create();
            Throwable th = null;
            try {
                CommitSequencePublisher commitSequencePublisher = (CommitSequencePublisher) create.register(DataPublisher.getInstance(Class.forName(datasetState.getProp(ConfigurationKeys.DATA_PUBLISHER_TYPE, ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE)), this.jobContext.getJobState()));
                commitSequencePublisher.publish(collection);
                Optional<CommitSequence.Builder> commitSequenceBuilder = commitSequencePublisher.getCommitSequenceBuilder();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return commitSequenceBuilder;
            } finally {
            }
        } catch (Throwable th3) {
            log.error("Failed to generate commit sequence", th3);
            setTaskFailureException(datasetState.getTaskStates(), th3);
            throw Throwables.propagate(th3);
        }
    }

    void checkForUnpublishedWUHandling(String str, JobState.DatasetState datasetState, Class<? extends DataPublisher> cls, Closer closer) throws ReflectiveOperationException, IOException {
        if (UnpublishedHandling.class.isAssignableFrom(cls)) {
            Closeable closeable = (DataPublisher) closer.register(DataPublisher.getInstance(cls, this.jobContext.getJobState()));
            log.info(String.format("Calling publisher to handle unpublished work units for dataset %s of job %s.", str, this.jobContext.getJobId()));
            ((UnpublishedHandling) closeable).handleUnpublishedWorkUnits(datasetState.getTaskStatesAsWorkUnitStates());
        }
    }

    private void finalizeDatasetState(JobState.DatasetState datasetState, String str) {
        for (TaskState taskState : datasetState.getTaskStates()) {
            if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
                taskState.backoffActualHighWatermark();
                if (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
                    datasetState.setState(JobState.RunningState.FAILED);
                }
            }
        }
        datasetState.setId(str);
    }

    private void persistDatasetState(String str, JobState.DatasetState datasetState) throws IOException {
        log.info("Persisting dataset state for dataset " + str);
        this.jobContext.getDatasetStateStore().persistDatasetState(str, datasetState);
    }

    private static void setTaskFailureException(Collection<TaskState> collection, Throwable th) {
        Iterator<TaskState> it = collection.iterator();
        while (it.hasNext()) {
            it.next().setTaskFailureException(th);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Optional<CommitStep> buildDatasetStateCommitStep(String str, JobState.DatasetState datasetState) {
        log.info("Creating " + DatasetStateCommitStep.class.getSimpleName() + " for dataset " + str);
        return Optional.of(((DatasetStateCommitStep.Builder) new DatasetStateCommitStep.Builder().withProps(datasetState)).withDatasetUrn(str).withDatasetState(datasetState).build());
    }

    @ConstructorProperties({"shouldCommitDataInJob", "isJobCancelled", "deliverySemantics", "datasetUrn", "datasetState", "isMultithreaded", "jobContext"})
    public SafeDatasetCommit(boolean z, boolean z2, DeliverySemantics deliverySemantics, String str, JobState.DatasetState datasetState, boolean z3, JobContext jobContext) {
        this.shouldCommitDataInJob = z;
        this.isJobCancelled = z2;
        this.deliverySemantics = deliverySemantics;
        this.datasetUrn = str;
        this.datasetState = datasetState;
        this.isMultithreaded = z3;
        this.jobContext = jobContext;
    }
}
