package gobblin.runtime.fork;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import gobblin.configuration.ConfigurationKeys;
import gobblin.converter.DataConversionException;
import gobblin.runtime.BoundedBlockingRecordQueue;
import gobblin.runtime.ExecutionModel;
import gobblin.runtime.TaskContext;
import gobblin.runtime.TaskState;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/fork/AsynchronousFork.class */
public class AsynchronousFork extends Fork {
    private static final Logger log = LoggerFactory.getLogger(AsynchronousFork.class);
    private final BoundedBlockingRecordQueue<Object> recordQueue;

    public AsynchronousFork(TaskContext taskContext, Object obj, int i, int i2, ExecutionModel executionModel) throws Exception {
        super(taskContext, obj, i, i2, executionModel);
        TaskState taskState = taskContext.getTaskState();
        this.recordQueue = BoundedBlockingRecordQueue.newBuilder().hasCapacity(taskState.getPropAsInt("fork.record.queue.capacity", 100)).useTimeout(taskState.getPropAsLong("fork.record.queue.timeout", 1000L)).useTimeoutTimeUnit(TimeUnit.valueOf(taskState.getProp("fork.record.queue.timeout.unit", ConfigurationKeys.DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT))).collectStats().build();
    }

    @Override // gobblin.runtime.fork.Fork
    public Optional<BoundedBlockingRecordQueue<Object>.QueueStats> queueStats() {
        return this.recordQueue.stats();
    }

    @Override // gobblin.runtime.fork.Fork
    protected void processRecords() throws IOException, DataConversionException {
        do {
        } while (processRecord());
    }

    @Override // gobblin.runtime.fork.Fork
    protected boolean putRecordImpl(Object obj) throws InterruptedException {
        return this.recordQueue.put(obj);
    }

    boolean processRecord() throws IOException, DataConversionException {
        try {
            Object obj = this.recordQueue.get();
            if (obj != null && obj != Fork.SHUTDOWN_RECORD) {
                processRecord(obj);
            } else if (isParentTaskDone()) {
                return false;
            }
            return true;
        } catch (InterruptedException e) {
            log.warn("Interrupted while trying to get a record off the queue", e);
            Throwables.propagate(e);
            return true;
        }
    }
}
