package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import gobblin.configuration.ConfigurationKeys;
import gobblin.converter.Converter;
import gobblin.fork.ForkOperator;
import gobblin.fork.Forker;
import gobblin.qualitychecker.row.RowLevelPolicyChecker;
import gobblin.records.RecordStreamWithMetadata;
import gobblin.runtime.fork.Fork;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.StreamingExtractor;
import gobblin.util.ExponentialBackoff;
import gobblin.util.Id;
import gobblin.writer.AcknowledgableWatermark;
import gobblin.writer.FineGrainedWatermarkTracker;
import gobblin.writer.WatermarkManager;
import gobblin.writer.WatermarkStorage;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
import java.beans.ConstructorProperties;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/StreamModelTaskRunner.class */
public class StreamModelTaskRunner {
    private final Task task;
    private final TaskState taskState;
    private final Closer closer;
    private final TaskContext taskContext;
    private final Extractor extractor;
    private final Converter converter;
    private final RowLevelPolicyChecker rowChecker;
    private final TaskExecutor taskExecutor;
    private final ExecutionModel taskMode;
    private final AtomicBoolean shutdownRequested;
    private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
    private final Optional<WatermarkManager> watermarkManager;
    private final Optional<WatermarkStorage> watermarkStorage;
    private final Map<Optional<Fork>, Optional<Future<?>>> forks;
    private final String watermarkingStrategy;

    /* JADX INFO: Access modifiers changed from: protected */
    public void run() throws Exception {
        ForkOperator forkOperator = (ForkOperator) this.closer.register(this.taskContext.getForkOperator());
        RecordStreamWithMetadata recordStream = this.extractor.recordStream(this.shutdownRequested);
        ConnectableFlowable publish = recordStream.getRecordStream().publish();
        RecordStreamWithMetadata mapRecords = recordStream.withRecordStream(publish).mapRecords(recordEnvelope -> {
            this.task.onRecordExtract();
            return recordEnvelope;
        });
        if (this.task.isStreamingTask()) {
            if (this.watermarkTracker.isPresent()) {
                this.watermarkTracker.get().start();
            }
            this.watermarkManager.get().start();
            ((StreamingExtractor) this.taskContext.getRawSourceExtractor()).start(this.watermarkStorage.get());
            mapRecords = mapRecords.mapRecords(recordEnvelope2 -> {
                AcknowledgableWatermark acknowledgableWatermark = new AcknowledgableWatermark(recordEnvelope2.getWatermark());
                if (this.watermarkTracker.isPresent()) {
                    this.watermarkTracker.get().track(acknowledgableWatermark);
                }
                return recordEnvelope2.withAckableWatermark(acknowledgableWatermark);
            });
        }
        if (this.converter instanceof MultiConverter) {
            Iterator<Converter<?, ?, ?, ?>> it = ((MultiConverter) this.converter).getConverters().iterator();
            while (it.hasNext()) {
                mapRecords = it.next().processStream(mapRecords, this.taskState);
            }
        } else {
            mapRecords = this.converter.processStream(mapRecords, this.taskState);
        }
        Forker.ForkedStream forkStream = new Forker().forkStream(this.rowChecker.processStream(mapRecords, this.taskState), forkOperator, this.taskState);
        boolean z = !this.task.areSingleBranchTasksSynchronous(this.taskContext) || forkStream.getForkedStreams().size() > 1;
        int propAsInt = this.taskState.getPropAsInt(ConfigurationKeys.FORK_RECORD_QUEUE_CAPACITY_KEY, 100);
        for (int i = 0; i < forkStream.getForkedStreams().size(); i++) {
            RecordStreamWithMetadata recordStreamWithMetadata = (RecordStreamWithMetadata) forkStream.getForkedStreams().get(i);
            if (recordStreamWithMetadata != null) {
                if (z) {
                    recordStreamWithMetadata = recordStreamWithMetadata.mapStream(flowable -> {
                        return flowable.observeOn(Schedulers.from(this.taskExecutor.getForkExecutor()), false, propAsInt);
                    });
                }
                Fork fork = new Fork(this.taskContext, recordStreamWithMetadata.getSchema(), forkStream.getForkedStreams().size(), i, this.taskMode);
                fork.consumeRecordStream(recordStreamWithMetadata);
                this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null)));
                this.task.configureStreamingFork(fork, this.watermarkingStrategy);
            }
        }
        publish.connect();
        if (!ExponentialBackoff.awaitCondition().callable(() -> {
            return Boolean.valueOf(this.forks.keySet().stream().map((v0) -> {
                return v0.get();
            }).allMatch((v0) -> {
                return v0.isDone();
            }));
        }).initialDelay(1000L).maxDelay(1000L).maxWait(Long.valueOf(TimeUnit.MINUTES.toMillis(60L))).await()) {
            throw new TimeoutException("Forks did not finish withing specified timeout.");
        }
    }

    @ConstructorProperties({Id.Task.PREFIX, "taskState", "closer", "taskContext", "extractor", "converter", "rowChecker", "taskExecutor", "taskMode", "shutdownRequested", "watermarkTracker", "watermarkManager", "watermarkStorage", "forks", "watermarkingStrategy"})
    public StreamModelTaskRunner(Task task, TaskState taskState, Closer closer, TaskContext taskContext, Extractor extractor, Converter converter, RowLevelPolicyChecker rowLevelPolicyChecker, TaskExecutor taskExecutor, ExecutionModel executionModel, AtomicBoolean atomicBoolean, Optional<FineGrainedWatermarkTracker> optional, Optional<WatermarkManager> optional2, Optional<WatermarkStorage> optional3, Map<Optional<Fork>, Optional<Future<?>>> map, String str) {
        this.task = task;
        this.taskState = taskState;
        this.closer = closer;
        this.taskContext = taskContext;
        this.extractor = extractor;
        this.converter = converter;
        this.rowChecker = rowLevelPolicyChecker;
        this.taskExecutor = taskExecutor;
        this.taskMode = executionModel;
        this.shutdownRequested = atomicBoolean;
        this.watermarkTracker = optional;
        this.watermarkManager = optional2;
        this.watermarkStorage = optional3;
        this.forks = map;
        this.watermarkingStrategy = str;
    }
}
