package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import gobblin.Constructs;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.converter.Converter;
import gobblin.converter.DataConversionException;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.Tag;
import gobblin.publisher.TaskPublisher;
import gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import gobblin.qualitychecker.row.RowLevelPolicyChecker;
import gobblin.runtime.util.TaskMetrics;
import gobblin.state.ConstructState;
import gobblin.util.FinalState;
import gobblin.util.ForkOperatorUtils;
import gobblin.writer.DataWriter;
import gobblin.writer.Destination;
import gobblin.writer.PartitionedDataWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/Fork.class */
public class Fork implements Closeable, Runnable, FinalState {
    private final Logger logger;
    private final TaskContext taskContext;
    private final TaskState taskState;
    private final TaskState forkTaskState;
    private final String taskId;
    private final int branches;
    private final int index;
    private final Converter converter;
    private final Optional<Object> convertedSchema;
    private final RowLevelPolicyChecker rowLevelPolicyChecker;
    private final RowLevelPolicyCheckResults rowLevelPolicyCheckingResult;
    private final BoundedBlockingRecordQueue<Object> recordQueue;
    private final Closer closer = Closer.create();
    private Optional<DataWriter<Object>> writer = Optional.absent();
    private volatile boolean parentTaskDone = false;
    private final AtomicReference<ForkState> forkState;
    private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: gobblin.runtime.Fork$1, reason: invalid class name */
    /* loaded from: input_file:gobblin/runtime/Fork$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$gobblin$publisher$TaskPublisher$PublisherState = new int[TaskPublisher.PublisherState.values().length];

        static {
            try {
                $SwitchMap$gobblin$publisher$TaskPublisher$PublisherState[TaskPublisher.PublisherState.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$gobblin$publisher$TaskPublisher$PublisherState[TaskPublisher.PublisherState.CLEANUP_FAIL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$gobblin$publisher$TaskPublisher$PublisherState[TaskPublisher.PublisherState.POLICY_TESTS_FAIL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$gobblin$publisher$TaskPublisher$PublisherState[TaskPublisher.PublisherState.COMPONENTS_NOT_FINISHED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:gobblin/runtime/Fork$ForkState.class */
    public enum ForkState {
        PENDING,
        RUNNING,
        SUCCEEDED,
        FAILED,
        COMMITTED
    }

    public Fork(TaskContext taskContext, Object obj, int i, int i2) throws Exception {
        this.logger = LoggerFactory.getLogger(Fork.class.getName() + "-" + i2);
        this.taskContext = taskContext;
        this.taskState = this.taskContext.getTaskState();
        this.forkTaskState = i > 1 ? new TaskState(this.taskState) : this.taskState;
        this.taskId = this.taskState.getTaskId();
        this.branches = i;
        this.index = i2;
        this.converter = this.closer.register(new MultiConverter(this.taskContext.getConverters(this.index, this.forkTaskState)));
        this.convertedSchema = Optional.fromNullable(this.converter.convertSchema(obj, this.taskState));
        this.rowLevelPolicyChecker = this.closer.register(this.taskContext.getRowLevelPolicyChecker(this.index));
        this.rowLevelPolicyCheckingResult = new RowLevelPolicyCheckResults();
        if (this.taskState.getPropAsBoolean("writer.eager.initialization", false)) {
            buildWriterIfNotPresent();
        }
        this.recordQueue = BoundedBlockingRecordQueue.newBuilder().hasCapacity(this.taskState.getPropAsInt("fork.record.queue.capacity", 100)).useTimeout(this.taskState.getPropAsLong("fork.record.queue.timeout", 1000L)).useTimeoutTimeUnit(TimeUnit.valueOf(this.taskState.getProp("fork.record.queue.timeout.unit", ConfigurationKeys.DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT))).collectStats().build();
        this.forkState = new AtomicReference<>(ForkState.PENDING);
        if (GobblinMetrics.isEnabled(this.taskState)) {
            GobblinMetrics gobblinMetrics = GobblinMetrics.get(getForkMetricsName(taskContext.getTaskMetrics(), this.taskState, i2), taskContext.getTaskMetrics().getMetricContext(), getForkMetricsTags(this.taskState, i2));
            this.closer.register(gobblinMetrics.getMetricContext());
            Instrumented.setMetricContextName(this.taskState, gobblinMetrics.getMetricContext().getName());
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING);
        try {
            processRecords();
            compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
        } catch (Throwable th) {
            this.forkState.set(ForkState.FAILED);
            this.logger.error(String.format("Fork %d of task %s failed to process data records", Integer.valueOf(this.index), this.taskId), th);
        } finally {
            this.recordQueue.clear();
        }
    }

    public State getFinalState() {
        ConstructState constructState = new ConstructState();
        if (this.converter != null) {
            constructState.addConstructState(Constructs.CONVERTER, new ConstructState(this.converter.getFinalState()));
        }
        if (this.rowLevelPolicyChecker != null) {
            constructState.addConstructState(Constructs.ROW_QUALITY_CHECKER, new ConstructState(this.rowLevelPolicyChecker.getFinalState()));
        }
        if (this.writer.isPresent() && (this.writer.get() instanceof FinalState)) {
            constructState.addConstructState(Constructs.WRITER, new ConstructState(((FinalState) this.writer.get()).getFinalState()));
        }
        return constructState;
    }

    public boolean putRecord(Object obj) throws InterruptedException {
        if (this.forkState.compareAndSet(ForkState.FAILED, ForkState.FAILED)) {
            throw new IllegalStateException(String.format("Fork %d of task %s has failed and is no longer running", Integer.valueOf(this.index), this.taskId));
        }
        return this.recordQueue.put(obj);
    }

    public void markParentTaskDone() {
        this.parentTaskDone = true;
    }

    public void updateRecordMetrics() {
        if (this.writer.isPresent()) {
            this.taskState.updateRecordMetrics(((DataWriter) this.writer.get()).recordsWritten(), this.index);
        }
    }

    public void updateByteMetrics() throws IOException {
        if (this.writer.isPresent()) {
            this.taskState.updateByteMetrics(((DataWriter) this.writer.get()).bytesWritten(), this.index);
        }
    }

    public boolean commit() throws Exception {
        try {
            if (!checkDataQuality(this.convertedSchema)) {
                this.logger.error(String.format("Fork %d of task %s failed to pass quality checking", Integer.valueOf(this.index), this.taskId));
                compareAndSetForkState(ForkState.SUCCEEDED, ForkState.FAILED);
                return false;
            }
            this.logger.info(String.format("Committing data for fork %d of task %s", Integer.valueOf(this.index), this.taskId));
            commitData();
            compareAndSetForkState(ForkState.SUCCEEDED, ForkState.COMMITTED);
            return true;
        } catch (Throwable th) {
            this.logger.error(String.format("Fork %d of task %s failed to commit data", Integer.valueOf(this.index), this.taskId), th);
            this.forkState.set(ForkState.FAILED);
            Throwables.propagate(th);
            return false;
        }
    }

    public String getTaskId() {
        return this.taskId;
    }

    public int getIndex() {
        return this.index;
    }

    public Optional<BoundedBlockingRecordQueue<Object>.QueueStats> queueStats() {
        return this.recordQueue.stats();
    }

    public boolean isSucceeded() {
        return this.forkState.compareAndSet(ForkState.SUCCEEDED, ForkState.SUCCEEDED);
    }

    public String toString() {
        return "Fork: TaskId = \"" + this.taskId + "\" Index: \"" + this.index + "\" State: \"" + this.forkState + "\"";
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.parentTaskDone = true;
        this.taskState.setProp(ForkOperatorUtils.getPropertyNameForBranch("fork.state", this.branches, this.index), this.forkState.get().name());
        try {
            this.closer.close();
        } finally {
            if (this.writer.isPresent()) {
                ((DataWriter) this.writer.get()).cleanup();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getRecordsWritten() {
        if (this.writer.isPresent()) {
            return ((DataWriter) this.writer.get()).recordsWritten();
        }
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getBytesWritten() {
        try {
            if (this.writer.isPresent()) {
                return ((DataWriter) this.writer.get()).bytesWritten();
            }
            return 0L;
        } catch (Throwable th) {
            return 0L;
        }
    }

    private DataWriter<Object> buildWriter() throws IOException {
        return new PartitionedDataWriter(this.taskContext.getDataWriterBuilder(this.branches, this.index).writeTo(Destination.of(this.taskContext.getDestinationType(this.branches, this.index), this.taskState)).writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, this.index)).withWriterId(this.taskId).withSchema(this.convertedSchema.orNull()).withBranches(this.branches).forBranch(this.index), this.taskContext.getTaskState());
    }

    private void buildWriterIfNotPresent() throws IOException {
        if (this.writer.isPresent()) {
            return;
        }
        this.writer = Optional.of(this.closer.register(buildWriter()));
    }

    private void processRecords() throws IOException, DataConversionException {
        while (true) {
            try {
                Object obj = this.recordQueue.get();
                if (obj != null) {
                    buildWriterIfNotPresent();
                    for (Object obj2 : this.converter.convertRecord(this.convertedSchema, obj, this.taskState)) {
                        if (this.rowLevelPolicyChecker.executePolicies(obj2, this.rowLevelPolicyCheckingResult)) {
                            ((DataWriter) this.writer.get()).write(obj2);
                        }
                    }
                } else if (this.parentTaskDone) {
                    return;
                }
            } catch (InterruptedException e) {
                this.logger.warn("Interrupted while trying to get a record off the queue", e);
                Throwables.propagate(e);
            }
        }
    }

    private boolean checkDataQuality(Optional<Object> optional) throws Exception {
        if (this.branches > 1) {
            this.forkTaskState.setProp("qualitychecker.rows.expected", this.taskState.getProp("qualitychecker.rows.expected"));
            this.forkTaskState.setProp("qualitychecker.rows.extracted", this.taskState.getProp("qualitychecker.rows.extracted"));
        }
        String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch("writer.records.written", this.branches, this.index);
        if (this.writer.isPresent()) {
            this.forkTaskState.setProp("qualitychecker.rows.written", Long.valueOf(((DataWriter) this.writer.get()).recordsWritten()));
            this.taskState.setProp(propertyNameForBranch, Long.valueOf(((DataWriter) this.writer.get()).recordsWritten()));
        } else {
            this.forkTaskState.setProp("qualitychecker.rows.written", 0L);
            this.taskState.setProp(propertyNameForBranch, 0L);
        }
        if (optional.isPresent()) {
            this.forkTaskState.setProp("extract.schema", optional.get().toString());
        }
        try {
            switch (AnonymousClass1.$SwitchMap$gobblin$publisher$TaskPublisher$PublisherState[this.taskContext.getTaskPublisher(this.forkTaskState, this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, this.branches > 1 ? this.index : -1).executePolicies(), this.branches > 1 ? this.index : -1).canPublish().ordinal()]) {
                case 1:
                    return true;
                case 2:
                    this.logger.error("Cleanup failed for task " + this.taskId);
                    return false;
                case 3:
                    this.logger.error("Not all quality checking passed for task " + this.taskId);
                    return false;
                case 4:
                    this.logger.error("Not all components completed for task " + this.taskId);
                    return false;
                default:
                    return false;
            }
        } catch (Throwable th) {
            this.logger.error("Failed to check task-level data quality", th);
            return false;
        }
    }

    private void commitData() throws IOException {
        if (this.writer.isPresent()) {
            ((DataWriter) this.writer.get()).commit();
        }
        try {
            if (GobblinMetrics.isEnabled(this.taskState.getWorkunit())) {
                updateRecordMetrics();
                updateByteMetrics();
            }
        } catch (IOException e) {
            this.logger.error("Failed to update byte-level metrics of task " + this.taskId);
        }
    }

    private void compareAndSetForkState(ForkState forkState, ForkState forkState2) {
        if (!this.forkState.compareAndSet(forkState, forkState2)) {
            throw new IllegalStateException(String.format("Expected fork state %s; actual fork state %s", forkState.name(), this.forkState.get().name()));
        }
    }

    private static List<Tag<?>> getForkMetricsTags(State state, int i) {
        return ImmutableList.of(new Tag(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, i)));
    }

    private static String getForkMetricsName(TaskMetrics taskMetrics, State state, int i) {
        return taskMetrics.getName() + "." + getForkMetricsId(state, i);
    }

    private static String getForkMetricsId(State state, int i) {
        return state.getProp("fork.branch.name." + i, "fork_" + i);
    }
}
