package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import gobblin.Constructs;
import gobblin.configuration.WorkUnitState;
import gobblin.converter.Converter;
import gobblin.fork.CopyNotSupportedException;
import gobblin.fork.Copyable;
import gobblin.fork.ForkOperator;
import gobblin.instrumented.extractor.InstrumentedExtractorBase;
import gobblin.instrumented.extractor.InstrumentedExtractorDecorator;
import gobblin.publisher.SingleTaskDataPublisher;
import gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import gobblin.qualitychecker.row.RowLevelPolicyChecker;
import gobblin.source.extractor.JobCommitPolicy;
import gobblin.state.ConstructState;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/Task.class */
public class Task implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private final String jobId;
    private final String taskId;
    private final TaskContext taskContext;
    private final TaskState taskState;
    private final TaskStateTracker taskStateTracker;
    private final Optional<CountDownLatch> countDownLatch;
    private final CompletionService forkCompletionService;
    private final List<Optional<Fork>> forks = Lists.newArrayList();
    private final AtomicInteger retryCount = new AtomicInteger();

    public Task(TaskContext taskContext, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<CountDownLatch> optional) {
        this.taskContext = taskContext;
        this.taskState = taskContext.getTaskState();
        this.jobId = this.taskState.getJobId();
        this.taskId = this.taskState.getTaskId();
        this.taskStateTracker = taskStateTracker;
        this.forkCompletionService = new ExecutorCompletionService(taskExecutor.getForkExecutor());
        this.countDownLatch = optional;
    }

    @Override // java.lang.Runnable
    public void run() {
        long currentTimeMillis = System.currentTimeMillis();
        this.taskState.setStartTime(currentTimeMillis);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.RUNNING);
        this.forks.clear();
        Closer create = Closer.create();
        try {
            try {
                InstrumentedExtractorBase<?, ?> instrumentedExtractorBase = (InstrumentedExtractorBase) create.register(new InstrumentedExtractorDecorator(this.taskState, this.taskContext.getExtractor()));
                Converter<?, ?, ?, ?> converter = (Converter) create.register(new MultiConverter(this.taskContext.getConverters()));
                ForkOperator forkOperator = (ForkOperator) create.register(this.taskContext.getForkOperator());
                forkOperator.init(this.taskState);
                int branches = forkOperator.getBranches(this.taskState);
                this.taskState.setProp("fork.branches", Integer.valueOf(branches));
                Object convertSchema = converter.convertSchema(instrumentedExtractorBase.getSchema(), this.taskState);
                List<Boolean> forkSchema = forkOperator.forkSchema(this.taskState, convertSchema);
                if (forkSchema.size() != branches) {
                    throw new ForkBranchMismatchException(String.format("Number of forked schemas [%d] is not equal to number of branches [%d]", Integer.valueOf(forkSchema.size()), Integer.valueOf(branches)));
                }
                if (inMultipleBranches(forkSchema) && !(convertSchema instanceof Copyable)) {
                    throw new CopyNotSupportedException(convertSchema + " is not copyable");
                }
                for (int i = 0; i < branches; i++) {
                    if (forkSchema.get(i).booleanValue()) {
                        Fork fork = (Fork) create.register(new Fork(this.taskContext, convertSchema instanceof Copyable ? ((Copyable) convertSchema).copy() : convertSchema, branches, i));
                        this.forkCompletionService.submit(fork, fork);
                        this.forks.add(Optional.of(fork));
                    } else {
                        this.forks.add(Optional.absent());
                    }
                }
                RowLevelPolicyChecker rowLevelPolicyChecker = (RowLevelPolicyChecker) create.register(this.taskContext.getRowLevelPolicyChecker());
                RowLevelPolicyCheckResults rowLevelPolicyCheckResults = new RowLevelPolicyCheckResults();
                long j = 0;
                while (true) {
                    Object readRecord = instrumentedExtractorBase.readRecord((Object) null);
                    if (readRecord == null) {
                        break;
                    }
                    j++;
                    Iterator it = converter.convertRecord(convertSchema, readRecord, this.taskState).iterator();
                    while (it.hasNext()) {
                        processRecord(it.next(), forkOperator, rowLevelPolicyChecker, rowLevelPolicyCheckResults, branches);
                    }
                }
                LOG.info("Extracted " + j + " data records");
                LOG.info("Row quality checker finished with results: " + rowLevelPolicyCheckResults.getResults());
                this.taskState.setProp("qualitychecker.rows.extracted", Long.valueOf(j));
                this.taskState.setProp("qualitychecker.rows.expected", Long.valueOf(instrumentedExtractorBase.getExpectedRecordCount()));
                for (Optional<Fork> optional : this.forks) {
                    if (optional.isPresent()) {
                        ((Fork) optional.get()).markParentTaskDone();
                    }
                }
                Iterator<Optional<Fork>> it2 = this.forks.iterator();
                while (it2.hasNext()) {
                    if (it2.next().isPresent()) {
                        try {
                            this.forkCompletionService.take();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                boolean z = true;
                for (Optional<Fork> optional2 : this.forks) {
                    if (optional2.isPresent()) {
                        if (!((Fork) optional2.get()).isSucceeded()) {
                            z = false;
                        } else if (!((Fork) optional2.get()).commit()) {
                            z = false;
                        }
                    }
                }
                if (z) {
                    this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
                } else {
                    LOG.error(String.format("Not all forks of task %s succeeded", this.taskId));
                    this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
                }
                addConstructsFinalStateToTaskState(instrumentedExtractorBase, converter, rowLevelPolicyChecker);
                this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
                this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
                try {
                    create.close();
                } catch (Throwable th) {
                    LOG.error("Failed to close all open resources", th);
                }
                try {
                    try {
                        if (shouldPublishDataInTask()) {
                            publishTaskData();
                            this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                        }
                        long currentTimeMillis2 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis2);
                        this.taskState.setTaskDuration(currentTimeMillis2 - currentTimeMillis);
                        this.taskStateTracker.onTaskCompletion(this);
                    } catch (Throwable th2) {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis3);
                        this.taskState.setTaskDuration(currentTimeMillis3 - currentTimeMillis);
                        this.taskStateTracker.onTaskCompletion(this);
                        throw th2;
                    }
                } catch (IOException e2) {
                    failTask(e2);
                    long currentTimeMillis4 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis4);
                    this.taskState.setTaskDuration(currentTimeMillis4 - currentTimeMillis);
                    this.taskStateTracker.onTaskCompletion(this);
                }
            } catch (Throwable th3) {
                failTask(th3);
                addConstructsFinalStateToTaskState(null, null, null);
                this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
                this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
                try {
                    create.close();
                } catch (Throwable th4) {
                    LOG.error("Failed to close all open resources", th4);
                }
                try {
                    try {
                        if (shouldPublishDataInTask()) {
                            publishTaskData();
                            this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                        }
                        long currentTimeMillis5 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis5);
                        this.taskState.setTaskDuration(currentTimeMillis5 - currentTimeMillis);
                        this.taskStateTracker.onTaskCompletion(this);
                    } catch (Throwable th5) {
                        long currentTimeMillis6 = System.currentTimeMillis();
                        this.taskState.setEndTime(currentTimeMillis6);
                        this.taskState.setTaskDuration(currentTimeMillis6 - currentTimeMillis);
                        this.taskStateTracker.onTaskCompletion(this);
                        throw th5;
                    }
                } catch (IOException e3) {
                    failTask(e3);
                    long currentTimeMillis7 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis7);
                    this.taskState.setTaskDuration(currentTimeMillis7 - currentTimeMillis);
                    this.taskStateTracker.onTaskCompletion(this);
                }
            }
        } catch (Throwable th6) {
            addConstructsFinalStateToTaskState(null, null, null);
            this.taskState.setProp("writer.records.written", Long.valueOf(getRecordsWritten()));
            this.taskState.setProp("writer.bytes.written", Long.valueOf(getBytesWritten()));
            try {
                create.close();
            } catch (Throwable th7) {
                LOG.error("Failed to close all open resources", th7);
            }
            try {
                try {
                    if (shouldPublishDataInTask()) {
                        publishTaskData();
                        this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    }
                    long currentTimeMillis8 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis8);
                    this.taskState.setTaskDuration(currentTimeMillis8 - currentTimeMillis);
                    this.taskStateTracker.onTaskCompletion(this);
                } catch (Throwable th8) {
                    long currentTimeMillis9 = System.currentTimeMillis();
                    this.taskState.setEndTime(currentTimeMillis9);
                    this.taskState.setTaskDuration(currentTimeMillis9 - currentTimeMillis);
                    this.taskStateTracker.onTaskCompletion(this);
                    throw th8;
                }
            } catch (IOException e4) {
                failTask(e4);
                long currentTimeMillis10 = System.currentTimeMillis();
                this.taskState.setEndTime(currentTimeMillis10);
                this.taskState.setTaskDuration(currentTimeMillis10 - currentTimeMillis);
                this.taskStateTracker.onTaskCompletion(this);
            }
            throw th6;
        }
    }

    private void failTask(Throwable th) {
        LOG.error(String.format("Task %s failed", this.taskId), th);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
        this.taskState.setProp("task.failure.exception", Throwables.getStackTraceAsString(th));
    }

    private boolean shouldPublishDataInTask() {
        if (this.taskState.getPropAsBoolean("publish.data.at.job.level", true)) {
            LOG.info(String.format("%s is true. Will publish data at the job level.", "publish.data.at.job.level"));
            return false;
        }
        JobCommitPolicy commitPolicy = JobCommitPolicy.getCommitPolicy(this.taskState);
        if (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS) {
            return this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL;
        }
        if (commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS) {
            return true;
        }
        LOG.info("Will publish data at the job level with job commit policy: " + commitPolicy);
        return false;
    }

    private void publishTaskData() throws IOException {
        Closer create = Closer.create();
        try {
            try {
                SingleTaskDataPublisher register = create.register(SingleTaskDataPublisher.getInstance(Class.forName(this.taskState.getProp("data.publisher.type", "gobblin.publisher.BaseDataPublisher")), this.taskState));
                LOG.info("Publishing data from task " + this.taskId);
                register.publish(this.taskState);
                create.close();
            } catch (ClassCastException e) {
                LOG.error(String.format("To publish data in task, the publisher class (%s) must extend %s", "data.publisher.type", SingleTaskDataPublisher.class.getSimpleName()), e);
                this.taskState.setTaskFailureException(e);
                throw create.rethrow(e);
            } catch (Throwable th) {
                this.taskState.setTaskFailureException(th);
                throw create.rethrow(th);
            }
        } catch (Throwable th2) {
            create.close();
            throw th2;
        }
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getTaskId() {
        return this.taskId;
    }

    public TaskContext getTaskContext() {
        return this.taskContext;
    }

    public TaskState getTaskState() {
        return this.taskState;
    }

    public List<Optional<Fork>> getForks() {
        return ImmutableList.copyOf(this.forks);
    }

    public void updateRecordMetrics() {
        for (Optional<Fork> optional : this.forks) {
            if (optional.isPresent()) {
                ((Fork) optional.get()).updateRecordMetrics();
            }
        }
    }

    public void updateByteMetrics() {
        try {
            for (Optional<Fork> optional : this.forks) {
                if (optional.isPresent()) {
                    ((Fork) optional.get()).updateByteMetrics();
                }
            }
        } catch (IOException e) {
            LOG.error("Failed to update byte-level metrics for task " + this.taskId, e);
        }
    }

    public void incrementRetryCount() {
        this.retryCount.incrementAndGet();
    }

    public int getRetryCount() {
        return this.retryCount.get();
    }

    public void markTaskCompletion() {
        if (this.countDownLatch.isPresent()) {
            ((CountDownLatch) this.countDownLatch.get()).countDown();
        }
        this.taskState.setProp("task.retries", Integer.valueOf(this.retryCount.get()));
    }

    public String toString() {
        return this.taskId;
    }

    private void processRecord(Object obj, ForkOperator forkOperator, RowLevelPolicyChecker rowLevelPolicyChecker, RowLevelPolicyCheckResults rowLevelPolicyCheckResults, int i) throws Exception {
        if (rowLevelPolicyChecker.executePolicies(obj, rowLevelPolicyCheckResults)) {
            List<Boolean> forkDataRecord = forkOperator.forkDataRecord(this.taskState, obj);
            if (forkDataRecord.size() != i) {
                throw new ForkBranchMismatchException(String.format("Number of forked data records [%d] is not equal to number of branches [%d]", Integer.valueOf(forkDataRecord.size()), Integer.valueOf(i)));
            }
            if (inMultipleBranches(forkDataRecord) && !(obj instanceof Copyable)) {
                throw new CopyNotSupportedException(obj + " is not copyable");
            }
            boolean z = false;
            boolean[] zArr = new boolean[i];
            while (!z) {
                z = true;
                for (int i2 = 0; i2 < i; i2++) {
                    if (!zArr[i2]) {
                        if (this.forks.get(i2).isPresent() && forkDataRecord.get(i2).booleanValue()) {
                            boolean putRecord = ((Fork) this.forks.get(i2).get()).putRecord(obj instanceof Copyable ? ((Copyable) obj).copy() : obj);
                            zArr[i2] = putRecord;
                            if (!putRecord) {
                                z = false;
                            }
                        } else {
                            zArr[i2] = true;
                        }
                    }
                }
            }
        }
    }

    private boolean inMultipleBranches(List<Boolean> list) {
        int i = 0;
        Iterator<Boolean> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().booleanValue()) {
                i++;
                if (i > 1) {
                    break;
                }
            }
        }
        return i > 1;
    }

    private long getRecordsWritten() {
        long j = 0;
        for (Optional<Fork> optional : this.forks) {
            if (optional.isPresent()) {
                j += ((Fork) optional.get()).getRecordsWritten();
            }
        }
        return j;
    }

    private long getBytesWritten() {
        long j = 0;
        for (Optional<Fork> optional : this.forks) {
            if (optional.isPresent()) {
                j += ((Fork) optional.get()).getBytesWritten();
            }
        }
        return j;
    }

    private void addConstructsFinalStateToTaskState(InstrumentedExtractorBase<?, ?> instrumentedExtractorBase, Converter<?, ?, ?, ?> converter, RowLevelPolicyChecker rowLevelPolicyChecker) {
        ConstructState constructState = new ConstructState();
        if (instrumentedExtractorBase != null) {
            constructState.addConstructState(Constructs.EXTRACTOR, new ConstructState(instrumentedExtractorBase.getFinalState()));
        }
        if (converter != null) {
            constructState.addConstructState(Constructs.CONVERTER, new ConstructState(converter.getFinalState()));
        }
        if (rowLevelPolicyChecker != null) {
            constructState.addConstructState(Constructs.ROW_QUALITY_CHECKER, new ConstructState(rowLevelPolicyChecker.getFinalState()));
        }
        int i = 0;
        for (Optional<Fork> optional : this.forks) {
            if (optional.isPresent()) {
                constructState.addConstructState(Constructs.FORK_OPERATOR, new ConstructState(((Fork) optional.get()).getFinalState()), Integer.toString(i));
            }
            i++;
        }
        constructState.mergeIntoWorkUnitState(this.taskState);
    }
}
