package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.Constructs;
import gobblin.commit.SpeculativeAttemptAwareConstruct;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.converter.Converter;
import gobblin.fork.CopyHelper;
import gobblin.fork.CopyNotSupportedException;
import gobblin.fork.Copyable;
import gobblin.fork.ForkOperator;
import gobblin.instrumented.extractor.InstrumentedExtractorBase;
import gobblin.instrumented.extractor.InstrumentedExtractorDecorator;
import gobblin.metrics.event.EventSubmitter;
import gobblin.publisher.DataPublisher;
import gobblin.publisher.SingleTaskDataPublisher;
import gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import gobblin.qualitychecker.row.RowLevelPolicyChecker;
import gobblin.runtime.fork.AsynchronousFork;
import gobblin.runtime.fork.Fork;
import gobblin.runtime.fork.SynchronousFork;
import gobblin.runtime.task.TaskIFace;
import gobblin.runtime.util.TaskMetrics;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.JobCommitPolicy;
import gobblin.source.extractor.RecordEnvelope;
import gobblin.source.extractor.StreamingExtractor;
import gobblin.state.ConstructState;
import gobblin.util.ConfigUtils;
import gobblin.writer.AcknowledgableRecordEnvelope;
import gobblin.writer.AcknowledgableWatermark;
import gobblin.writer.FineGrainedWatermarkTracker;
import gobblin.writer.MultiWriterWatermarkManager;
import gobblin.writer.TrackerBasedWatermarkManager;
import gobblin.writer.WatermarkAwareWriter;
import gobblin.writer.WatermarkManager;
import gobblin.writer.WatermarkStorage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.BooleanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:gobblin/runtime/Task.class */
public class Task implements TaskIFace {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private final String jobId;
    private final String taskId;
    private final String taskKey;
    private final TaskContext taskContext;
    private final TaskState taskState;
    private final TaskStateTracker taskStateTracker;
    private final TaskExecutor taskExecutor;
    private final Optional<CountDownLatch> countDownLatch;
    private final Map<Optional<Fork>, Optional<Future<?>>> forks;
    private final AtomicInteger retryCount;
    private final Converter converter;
    private final InstrumentedExtractorBase extractor;
    private final RowLevelPolicyChecker rowChecker;
    private final ExecutionModel taskMode;
    private final String watermarkingStrategy;
    private final Optional<WatermarkManager> watermarkManager;
    private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
    private final Optional<WatermarkStorage> watermarkStorage;
    private final Closer closer;
    private long startTime;
    private volatile long lastRecordPulledTimestampMillis;
    private final AtomicLong recordsPulled;
    private final AtomicBoolean shutdownRequested;
    private final CountDownLatch shutdownLatch;

    public Task(TaskContext taskContext, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<CountDownLatch> optional) {
        Config empty;
        this.forks = Maps.newLinkedHashMap();
        this.retryCount = new AtomicInteger();
        this.taskContext = taskContext;
        this.taskState = taskContext.getTaskState();
        this.jobId = this.taskState.getJobId();
        this.taskId = this.taskState.getTaskId();
        this.taskKey = this.taskState.getTaskKey();
        this.taskStateTracker = taskStateTracker;
        this.taskExecutor = taskExecutor;
        this.countDownLatch = optional;
        this.closer = Closer.create();
        this.closer.register(this.taskState.getTaskBrokerNullable());
        this.extractor = this.closer.register(new InstrumentedExtractorDecorator(this.taskState, this.taskContext.getExtractor()));
        this.converter = this.closer.register(new MultiConverter(this.taskContext.getConverters()));
        try {
            this.rowChecker = this.closer.register(this.taskContext.getRowLevelPolicyChecker());
            this.taskMode = getExecutionModel(this.taskState);
            this.recordsPulled = new AtomicLong(0L);
            this.lastRecordPulledTimestampMillis = 0L;
            this.shutdownRequested = new AtomicBoolean(false);
            this.shutdownLatch = new CountDownLatch(1);
            this.watermarkingStrategy = "FineGrain";
            if (!isStreamingTask()) {
                this.watermarkManager = Optional.absent();
                this.watermarkTracker = Optional.absent();
                this.watermarkStorage = Optional.absent();
                return;
            }
            Extractor rawSourceExtractor = this.taskContext.getRawSourceExtractor();
            if (!(rawSourceExtractor instanceof StreamingExtractor)) {
                LOG.error("Extractor {}  is not an instance of StreamingExtractor but the task is configured to run in continuous mode", rawSourceExtractor.getClass().getName());
                throw new TaskInstantiationException("Extraction " + rawSourceExtractor.getClass().getName() + " is not an instance of StreamingExtractor but the task is configured to run in continuous mode");
            }
            this.watermarkStorage = Optional.of(this.taskContext.getWatermarkStorage());
            try {
                empty = ConfigUtils.propertiesToConfig(this.taskState.getProperties());
            } catch (Exception e) {
                LOG.warn("Failed to deserialize taskState into Config.. continuing with an empty config", e);
                empty = ConfigFactory.empty();
            }
            long longValue = ConfigUtils.getLong(empty, TaskConfigurationKeys.STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS, TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS).longValue();
            if (this.watermarkingStrategy.equals("FineGrain")) {
                this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(empty)));
                this.watermarkManager = Optional.of(this.closer.register(new TrackerBasedWatermarkManager((WatermarkStorage) this.watermarkStorage.get(), (FineGrainedWatermarkTracker) this.watermarkTracker.get(), longValue, Optional.of(LOG))));
            } else {
                this.watermarkManager = Optional.of(this.closer.register(new MultiWriterWatermarkManager((WatermarkStorage) this.watermarkStorage.get(), longValue, Optional.of(LOG))));
                this.watermarkTracker = Optional.absent();
            }
        } catch (Exception e2) {
            try {
                this.closer.close();
            } catch (Throwable th) {
                LOG.error("Failed to close all open resources", th);
            }
            throw new RuntimeException("Failed to instantiate row checker.", e2);
        }
    }

    public static ExecutionModel getExecutionModel(State state) {
        String prop = state.getProp(TaskConfigurationKeys.TASK_EXECUTION_MODE, TaskConfigurationKeys.DEFAULT_TASK_EXECUTION_MODE);
        try {
            return ExecutionModel.valueOf(prop.toUpperCase());
        } catch (Exception e) {
            LOG.warn("Could not find an execution model corresponding to {}, returning {}", new Object[]{prop, ExecutionModel.BATCH, e});
            return ExecutionModel.BATCH;
        }
    }

    private boolean areSingleBranchTasksSynchronous(TaskContext taskContext) {
        return BooleanUtils.toBoolean(taskContext.getTaskState().getProp(TaskConfigurationKeys.TASK_IS_SINGLE_BRANCH_SYNCHRONOUS, TaskConfigurationKeys.DEFAULT_TASK_IS_SINGLE_BRANCH_SYNCHRONOUS));
    }

    private boolean isStreamingTask() {
        return this.taskMode.equals(ExecutionModel.STREAMING);
    }

    @Override // gobblin.runtime.task.TaskIFace
    public boolean awaitShutdown(long j) throws InterruptedException {
        return this.shutdownLatch.await(j, TimeUnit.MILLISECONDS);
    }

    private void completeShutdown() {
        this.shutdownLatch.countDown();
    }

    private boolean shutdownRequested() {
        if (!this.shutdownRequested.get()) {
            this.shutdownRequested.set(Thread.currentThread().isInterrupted());
        }
        return this.shutdownRequested.get();
    }

    @Override // gobblin.runtime.task.TaskIFace
    public void shutdown() {
        this.shutdownRequested.set(true);
    }

    @Override // gobblin.runtime.task.TaskIFace
    public String getProgress() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.lastRecordPulledTimestampMillis;
        if (!isStreamingTask()) {
            return String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago", Long.valueOf(this.recordsPulled.get()), Long.valueOf(j));
        }
        WatermarkManager.CommitStatus commitStatus = ((WatermarkManager) this.watermarkManager.get()).getCommitStatus();
        return String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago, lastWatermarkCommitted: %d ms ago, lastWatermarkCommitted: %s", Long.valueOf(this.recordsPulled.get()), Long.valueOf(j), Long.valueOf(currentTimeMillis - commitStatus.getLastWatermarkCommitSuccessTimestampMillis()), commitStatus.getLastCommittedWatermarks());
    }

    @Override // gobblin.runtime.task.TaskIFace, java.lang.Runnable
    public void run() {
        RecordEnvelope recordEnvelope;
        MDC.put("task.key", this.taskKey);
        this.startTime = System.currentTimeMillis();
        this.taskState.setStartTime(this.startTime);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.RUNNING);
        this.forks.clear();
        try {
            try {
                ForkOperator forkOperator = (ForkOperator) this.closer.register(this.taskContext.getForkOperator());
                forkOperator.init(this.taskState);
                int branches = forkOperator.getBranches(this.taskState);
                this.taskState.setProp("fork.branches", Integer.valueOf(branches));
                Object convertSchema = this.converter.convertSchema(this.extractor.getSchema(), this.taskState);
                List forkSchema = forkOperator.forkSchema(this.taskState, convertSchema);
                if (forkSchema.size() != branches) {
                    throw new ForkBranchMismatchException(String.format("Number of forked schemas [%d] is not equal to number of branches [%d]", Integer.valueOf(forkSchema.size()), Integer.valueOf(branches)));
                }
                if (inMultipleBranches(forkSchema) && !CopyHelper.isCopyable(convertSchema)) {
                    throw new CopyNotSupportedException(convertSchema + " is not copyable");
                }
                RowLevelPolicyChecker rowLevelPolicyChecker = (RowLevelPolicyChecker) this.closer.register(this.taskContext.getRowLevelPolicyChecker());
                RowLevelPolicyCheckResults rowLevelPolicyCheckResults = new RowLevelPolicyCheckResults();
                if (!areSingleBranchTasksSynchronous(this.taskContext) || branches > 1) {
                    for (int i = 0; i < branches; i++) {
                        if (((Boolean) forkSchema.get(i)).booleanValue()) {
                            AsynchronousFork asynchronousFork = (AsynchronousFork) this.closer.register(new AsynchronousFork(this.taskContext, convertSchema instanceof Copyable ? ((Copyable) convertSchema).copy() : convertSchema, branches, i, this.taskMode));
                            configureStreamingFork(asynchronousFork, this.watermarkingStrategy);
                            this.forks.put(Optional.of(asynchronousFork), Optional.of(this.taskExecutor.submit(asynchronousFork)));
                        } else {
                            this.forks.put(Optional.absent(), Optional.absent());
                        }
                    }
                } else {
                    SynchronousFork synchronousFork = (SynchronousFork) this.closer.register(new SynchronousFork(this.taskContext, convertSchema instanceof Copyable ? ((Copyable) convertSchema).copy() : convertSchema, branches, 0, this.taskMode));
                    configureStreamingFork(synchronousFork, this.watermarkingStrategy);
                    this.forks.put(Optional.of(synchronousFork), Optional.of(this.taskExecutor.submit(synchronousFork)));
                }
                if (!isStreamingTask()) {
                    while (true) {
                        Object readRecord = this.extractor.readRecord((Object) null);
                        if (readRecord == null) {
                            break;
                        }
                        onRecordExtract();
                        Iterator it = this.converter.convertRecord(convertSchema, readRecord, this.taskState).iterator();
                        while (it.hasNext()) {
                            processRecord(it.next(), forkOperator, rowLevelPolicyChecker, rowLevelPolicyCheckResults, branches, null);
                        }
                    }
                } else {
                    if (this.watermarkTracker.isPresent()) {
                        ((FineGrainedWatermarkTracker) this.watermarkTracker.get()).start();
                    }
                    ((WatermarkManager) this.watermarkManager.get()).start();
                    this.taskContext.getRawSourceExtractor().start((WatermarkStorage) this.watermarkStorage.get());
                    while (!shutdownRequested() && (recordEnvelope = (RecordEnvelope) this.extractor.readRecord((Object) null)) != null) {
                        onRecordExtract();
                        AcknowledgableWatermark acknowledgableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
                        if (this.watermarkTracker.isPresent()) {
                            ((FineGrainedWatermarkTracker) this.watermarkTracker.get()).track(acknowledgableWatermark);
                        }
                        Iterator it2 = this.converter.convertRecord(convertSchema, recordEnvelope.getRecord(), this.taskState).iterator();
                        while (it2.hasNext()) {
                            processRecord(it2.next(), forkOperator, rowLevelPolicyChecker, rowLevelPolicyCheckResults, branches, acknowledgableWatermark.incrementAck());
                        }
                        acknowledgableWatermark.ack();
                    }
                }
                LOG.info("Extracted " + this.recordsPulled + " data records");
                LOG.info("Row quality checker finished with results: " + rowLevelPolicyCheckResults.getResults());
                this.taskState.setProp("qualitychecker.rows.extracted", this.recordsPulled);
                this.taskState.setProp("qualitychecker.rows.expected", Long.valueOf(this.extractor.getExpectedRecordCount()));
                for (Optional<Fork> optional : this.forks.keySet()) {
                    if (optional.isPresent()) {
                        ((Fork) optional.get()).markParentTaskDone();
                    }
                }
                for (Optional<Future<?>> optional2 : this.forks.values()) {
                    if (optional2.isPresent()) {
                        try {
                            long nanoTime = System.nanoTime();
                            ((Future) optional2.get()).get();
                            LOG.info("Task shutdown: Fork future reaped in {} millis", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                if (this.watermarkManager.isPresent()) {
                    ((WatermarkManager) this.watermarkManager.get()).close();
                }
                if (this.watermarkTracker.isPresent()) {
                    ((FineGrainedWatermarkTracker) this.watermarkTracker.get()).close();
                }
                this.taskStateTracker.onTaskRunCompletion(this);
                completeShutdown();
            } catch (Throwable th) {
                failTask(th);
                this.taskStateTracker.onTaskRunCompletion(this);
                completeShutdown();
            }
        } catch (Throwable th2) {
            this.taskStateTracker.onTaskRunCompletion(this);
            completeShutdown();
            throw th2;
        }
    }

    private void configureStreamingFork(Fork fork, String str) throws IOException {
        if (isStreamingTask()) {
            WatermarkAwareWriter writer = fork.getWriter();
            if (!(writer instanceof WatermarkAwareWriter)) {
                String format = String.format("The Task is configured to run in continuous mode, but the writer %s is not a WatermarkAwareWriter", writer.getClass().getName());
                LOG.error(format);
                throw new RuntimeException(format);
            }
            if (str.equals("WriterBased")) {
                ((MultiWriterWatermarkManager) this.watermarkManager.get()).registerWriter(writer);
            }
        }
    }

    private void onRecordExtract() {
        this.recordsPulled.incrementAndGet();
        this.lastRecordPulledTimestampMillis = System.currentTimeMillis();
    }

    private void failTask(Throwable th) {
        LOG.error(String.format("Task %s failed", this.taskId), th);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
        this.taskState.setProp("task.failure.exception", Throwables.getStackTraceAsString(th));
    }

    private boolean shouldPublishDataInTask() {
        if (this.taskState.getPropAsBoolean("publish.data.at.job.level", true)) {
            LOG.info(String.format("%s is true. Will publish data at the job level.", "publish.data.at.job.level"));
            return false;
        }
        JobCommitPolicy commitPolicy = JobCommitPolicy.getCommitPolicy(this.taskState);
        if (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS) {
            return this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL;
        }
        if (commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS) {
            return true;
        }
        LOG.info("Will publish data at the job level with job commit policy: " + commitPolicy);
        return false;
    }

    private void publishTaskData() throws IOException {
        Closer create = Closer.create();
        try {
            try {
                SingleTaskDataPublisher register = create.register(SingleTaskDataPublisher.getInstance(getTaskPublisherClass(), this.taskState));
                LOG.info("Publishing data from task " + this.taskId);
                register.publish(this.taskState);
                create.close();
            } catch (ClassCastException e) {
                LOG.error(String.format("To publish data in task, the publisher class must extend %s", SingleTaskDataPublisher.class.getSimpleName()), e);
                this.taskState.setTaskFailureException(e);
                throw create.rethrow(e);
            } catch (Throwable th) {
                this.taskState.setTaskFailureException(th);
                throw create.rethrow(th);
            }
        } catch (Throwable th2) {
            create.close();
            throw th2;
        }
    }

    private Class<? extends DataPublisher> getTaskPublisherClass() throws ReflectiveOperationException {
        return this.taskState.contains("data.publisher.task.type") ? Class.forName(this.taskState.getProp("data.publisher.task.type")) : Class.forName(this.taskState.getProp("data.publisher.type", "gobblin.publisher.BaseDataPublisher"));
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getTaskId() {
        return this.taskId;
    }

    public String getTaskKey() {
        return this.taskKey;
    }

    public TaskContext getTaskContext() {
        return this.taskContext;
    }

    public TaskState getTaskState() {
        return this.taskState;
    }

    @Override // gobblin.runtime.task.TaskIFace
    public State getPersistentState() {
        return getTaskState();
    }

    @Override // gobblin.runtime.task.TaskIFace
    public State getExecutionMetadata() {
        return getTaskState();
    }

    @Override // gobblin.runtime.task.TaskIFace
    public WorkUnitState.WorkingState getWorkingState() {
        return getTaskState().getWorkingState();
    }

    public List<Optional<Fork>> getForks() {
        return ImmutableList.copyOf(this.forks.keySet());
    }

    public void updateRecordMetrics() {
        for (Optional<Fork> optional : this.forks.keySet()) {
            if (optional.isPresent()) {
                ((Fork) optional.get()).updateRecordMetrics();
            }
        }
    }

    public void updateByteMetrics() {
        try {
            for (Optional<Fork> optional : this.forks.keySet()) {
                if (optional.isPresent()) {
                    ((Fork) optional.get()).updateByteMetrics();
                }
            }
        } catch (IOException e) {
            LOG.error("Failed to update byte-level metrics for task " + this.taskId, e);
        }
    }

    public void incrementRetryCount() {
        this.retryCount.incrementAndGet();
    }

    public int getRetryCount() {
        return this.retryCount.get();
    }

    public void markTaskCompletion() {
        if (this.countDownLatch.isPresent()) {
            ((CountDownLatch) this.countDownLatch.get()).countDown();
        }
        this.taskState.setProp("task.retries", Integer.valueOf(this.retryCount.get()));
    }

    public String toString() {
        return this.taskId;
    }

    private void processRecord(Object obj, ForkOperator forkOperator, RowLevelPolicyChecker rowLevelPolicyChecker, RowLevelPolicyCheckResults rowLevelPolicyCheckResults, int i, AcknowledgableWatermark acknowledgableWatermark) throws Exception {
        if (!rowLevelPolicyChecker.executePolicies(obj, rowLevelPolicyCheckResults)) {
            if (acknowledgableWatermark != null) {
                acknowledgableWatermark.ack();
                return;
            }
            return;
        }
        List forkDataRecord = forkOperator.forkDataRecord(this.taskState, obj);
        if (forkDataRecord.size() != i) {
            throw new ForkBranchMismatchException(String.format("Number of forked data records [%d] is not equal to number of branches [%d]", Integer.valueOf(forkDataRecord.size()), Integer.valueOf(i)));
        }
        if (inMultipleBranches(forkDataRecord) && !CopyHelper.isCopyable(obj)) {
            throw new CopyNotSupportedException(obj.getClass().getName() + " is not copyable");
        }
        int i2 = 0;
        int i3 = 0;
        for (Optional<Fork> optional : this.forks.keySet()) {
            if (optional.isPresent() && ((Boolean) forkDataRecord.get(i2)).booleanValue()) {
                Object copy = CopyHelper.copy(obj, i3);
                i3++;
                if (isStreamingTask()) {
                    copy = new AcknowledgableRecordEnvelope(copy, acknowledgableWatermark.incrementAck());
                }
                for (boolean z = false; !z; z = ((Fork) optional.get()).putRecord(copy)) {
                }
            }
            i2++;
        }
        if (acknowledgableWatermark != null) {
            acknowledgableWatermark.ack();
        }
    }

    private static boolean inMultipleBranches(List<Boolean> list) {
        int i = 0;
        Iterator<Boolean> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().booleanValue()) {
                i++;
                if (i > 1) {
                    break;
                }
            }
        }
        return i > 1;
    }

    private long getRecordsWritten() {
        long j = 0;
        Iterator<Optional<Fork>> it = this.forks.keySet().iterator();
        while (it.hasNext()) {
            j += ((Fork) it.next().get()).getRecordsWritten();
        }
        return j;
    }

    private long getBytesWritten() {
        long j = 0;
        Iterator<Optional<Fork>> it = this.forks.keySet().iterator();
        while (it.hasNext()) {
            j += ((Fork) it.next().get()).getBytesWritten();
        }
        return j;
    }

    private void addConstructsFinalStateToTaskState(InstrumentedExtractorBase<?, ?> instrumentedExtractorBase, Converter<?, ?, ?, ?> converter, RowLevelPolicyChecker rowLevelPolicyChecker) {
        ConstructState constructState = new ConstructState();
        if (instrumentedExtractorBase != null) {
            constructState.addConstructState(Constructs.EXTRACTOR, new ConstructState(instrumentedExtractorBase.getFinalState()));
        }
        if (converter != null) {
            constructState.addConstructState(Constructs.CONVERTER, new ConstructState(converter.getFinalState()));
        }
        if (rowLevelPolicyChecker != null) {
            constructState.addConstructState(Constructs.ROW_QUALITY_CHECKER, new ConstructState(rowLevelPolicyChecker.getFinalState()));
        }
        int i = 0;
        Iterator<Optional<Fork>> it = this.forks.keySet().iterator();
        while (it.hasNext()) {
            constructState.addConstructState(Constructs.FORK_OPERATOR, new ConstructState(((Fork) it.next().get()).getFinalState()), Integer.toString(i));
            i++;
        }
        constructState.mergeIntoWorkUnitState(this.taskState);
    }

    /* JADX WARN: Finally extract failed */
    @Override // gobblin.runtime.task.TaskIFace
    public void commit() {
        try {
            try {
                ArrayList arrayList = new ArrayList();
                for (Optional<Fork> optional : this.forks.keySet()) {
                    if (optional.isPresent()) {
                        if (!((Fork) optional.get()).isSucceeded()) {
                            arrayList.add(Integer.valueOf(((Fork) optional.get()).getIndex()));
                        } else if (!((Fork) optional.get()).commit()) {
                            arrayList.add(Integer.valueOf(((Fork) optional.get()).getIndex()));
                        }
                    }
                }
                if (arrayList.size() == 0) {
                    this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
                } else {
                    failTask(new ForkException("Fork branches " + arrayList + " failed for task " + this.taskId));
                }
                addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
                this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
                this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
                submitTaskCommittedEvent();
                try {
                    this.closer.close();
                } catch (Throwable th) {
                    LOG.error("Failed to close all open resources", th);
                }
                for (Map.Entry<Optional<Fork>, Optional<Future<?>>> entry : this.forks.entrySet()) {
                    try {
                        if (entry.getKey().isPresent() && entry.getValue().isPresent()) {
                            try {
                                ((Future) entry.getValue().get()).cancel(true);
                            } catch (Throwable th2) {
                                LOG.error(String.format("Failed to cancel Fork \"%s\"", entry.getKey().get()), th2);
                            }
                        }
                    } catch (Throwable th3) {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis);
                        this.taskState.setTaskDuration(currentTimeMillis - this.startTime);
                        this.taskStateTracker.onTaskCommitCompletion(this);
                        throw th3;
                    }
                }
                try {
                    if (shouldPublishDataInTask()) {
                        publishTaskData();
                        this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    }
                    long currentTimeMillis2 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis2);
                    this.taskState.setTaskDuration(currentTimeMillis2 - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                } catch (IOException e) {
                    failTask(e);
                    long currentTimeMillis3 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis3);
                    this.taskState.setTaskDuration(currentTimeMillis3 - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                }
            } catch (Throwable th4) {
                failTask(th4);
                addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
                this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
                this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
                submitTaskCommittedEvent();
                try {
                    this.closer.close();
                } catch (Throwable th5) {
                    LOG.error("Failed to close all open resources", th5);
                }
                for (Map.Entry<Optional<Fork>, Optional<Future<?>>> entry2 : this.forks.entrySet()) {
                    if (entry2.getKey().isPresent() && entry2.getValue().isPresent()) {
                        try {
                            ((Future) entry2.getValue().get()).cancel(true);
                        } catch (Throwable th6) {
                            LOG.error(String.format("Failed to cancel Fork \"%s\"", entry2.getKey().get()), th6);
                        }
                    }
                }
                try {
                    try {
                        if (shouldPublishDataInTask()) {
                            publishTaskData();
                            this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                        }
                        long currentTimeMillis4 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis4);
                        this.taskState.setTaskDuration(currentTimeMillis4 - this.startTime);
                        this.taskStateTracker.onTaskCommitCompletion(this);
                    } catch (IOException e2) {
                        failTask(e2);
                        long currentTimeMillis5 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis5);
                        this.taskState.setTaskDuration(currentTimeMillis5 - this.startTime);
                        this.taskStateTracker.onTaskCommitCompletion(this);
                    }
                } catch (Throwable th7) {
                    long currentTimeMillis6 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis6);
                    this.taskState.setTaskDuration(currentTimeMillis6 - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                    throw th7;
                }
            }
        } catch (Throwable th8) {
            addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
            this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
            this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
            submitTaskCommittedEvent();
            try {
                this.closer.close();
            } catch (Throwable th9) {
                LOG.error("Failed to close all open resources", th9);
            }
            for (Map.Entry<Optional<Fork>, Optional<Future<?>>> entry3 : this.forks.entrySet()) {
                if (entry3.getKey().isPresent() && entry3.getValue().isPresent()) {
                    try {
                        ((Future) entry3.getValue().get()).cancel(true);
                    } catch (Throwable th10) {
                        LOG.error(String.format("Failed to cancel Fork \"%s\"", entry3.getKey().get()), th10);
                    }
                }
            }
            try {
                try {
                    if (shouldPublishDataInTask()) {
                        publishTaskData();
                        this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    }
                    long currentTimeMillis7 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis7);
                    this.taskState.setTaskDuration(currentTimeMillis7 - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                } catch (IOException e3) {
                    failTask(e3);
                    long currentTimeMillis8 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis8);
                    this.taskState.setTaskDuration(currentTimeMillis8 - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                    throw th8;
                }
                throw th8;
            } catch (Throwable th11) {
                long currentTimeMillis9 = System.currentTimeMillis();
                this.taskState.setEndTime(currentTimeMillis9);
                this.taskState.setTaskDuration(currentTimeMillis9 - this.startTime);
                this.taskStateTracker.onTaskCommitCompletion(this);
                throw th11;
            }
        }
    }

    protected void submitTaskCommittedEvent() {
        new EventSubmitter.Builder(TaskMetrics.get(this.taskState).getMetricContext(), "gobblin.runtime.task").build().submit("taskCommitted", ImmutableMap.of("taskId", this.taskId, "taskAttemptId", this.taskState.getTaskAttemptId().or("")));
    }

    @Override // gobblin.runtime.task.TaskIFace
    public boolean isSpeculativeExecutionSafe() {
        if ((this.extractor instanceof SpeculativeAttemptAwareConstruct) && !this.extractor.isSpeculativeAttemptSafe()) {
            return false;
        }
        if ((this.converter instanceof SpeculativeAttemptAwareConstruct) && !this.converter.isSpeculativeAttemptSafe()) {
            return false;
        }
        for (Optional<Fork> optional : this.forks.keySet()) {
            if (optional.isPresent() && !((Fork) optional.get()).isSpeculativeExecutionSafe()) {
                return false;
            }
        }
        return true;
    }

    public Task() {
        this.forks = Maps.newLinkedHashMap();
        this.retryCount = new AtomicInteger();
        this.jobId = null;
        this.taskId = null;
        this.taskKey = null;
        this.taskContext = null;
        this.taskState = null;
        this.taskStateTracker = null;
        this.taskExecutor = null;
        this.countDownLatch = null;
        this.converter = null;
        this.extractor = null;
        this.rowChecker = null;
        this.taskMode = null;
        this.watermarkingStrategy = null;
        this.watermarkManager = null;
        this.watermarkTracker = null;
        this.watermarkStorage = null;
        this.closer = null;
        this.recordsPulled = null;
        this.shutdownRequested = null;
        this.shutdownLatch = null;
    }
}
