package org.apache.gobblin.source.extractor.extract;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContextUtils;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.FlushRecordEnvelope;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.LastWatermarkTracker;
import org.apache.gobblin.writer.WatermarkStorage;
import org.apache.gobblin.writer.WatermarkTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/FlushingExtractor.class */
public abstract class FlushingExtractor<S, D> extends EventBasedExtractor<S, D> {
    public static final String GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS = "gobblin.extractor.precommit.steps";
    public static final String GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS = "gobblin.extractor.postcommit.steps";
    public static final String FLUSH_INTERVAL_SECONDS_KEY = "stream.flush.interval.secs";
    public static final String FLUSH_DATA_PUBLISHER_CLASS = "flush.data.publisher.class";
    public static final String DEFAULT_FLUSH_DATA_PUBLISHER_CLASS = "org.apache.gobblin.publisher.BaseDataPublisher";
    public static final String WATERMARK_COMMIT_TIME_METRIC = "state.store.metrics.watermarkCommitTime";
    public static final String COMMIT_STEP_METRIC_PREFIX = "commit.step.";
    protected Map<String, CheckpointableWatermark> lastCommittedWatermarks;
    private final List<String> preCommitSteps;
    private final List<String> postCommitSteps;
    private final Map<String, CommitStep> commitStepMap;
    private final AtomicLong watermarkCommitTime;
    private final List<AtomicLong> preCommitStepTimes;
    private final List<AtomicLong> postCommitStepTimes;
    protected Config config;
    private Optional<WatermarkStorage> watermarkStorage;
    protected WatermarkTracker watermarkTracker;
    protected Long flushIntervalMillis;
    protected Long timeOfLastFlush;
    private FlushAckable lastFlushAckable;
    private boolean hasOutstandingFlush;
    private Optional<DataPublisher> flushPublisher;
    protected WorkUnitState workUnitState;
    private static final Logger log = LoggerFactory.getLogger(FlushingExtractor.class);
    public static final Long DEFAULT_FLUSH_INTERVAL_SECONDS = 60L;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/source/extractor/extract/FlushingExtractor$FlushAckable.class */
    public static class FlushAckable implements Ackable {
        private Throwable error;
        private final CountDownLatch processed = new CountDownLatch(1);

        public void ack() {
            this.processed.countDown();
        }

        public void nack(Throwable th) {
            this.error = th;
            this.processed.countDown();
        }

        public Throwable waitForAck() {
            try {
                this.processed.await();
                return this.error;
            } catch (InterruptedException e) {
                throw new RuntimeException("interrupted while waiting for ack");
            }
        }
    }

    public FlushingExtractor(WorkUnitState workUnitState) {
        super(workUnitState);
        this.commitStepMap = Maps.newHashMap();
        this.watermarkCommitTime = new AtomicLong(0L);
        this.preCommitStepTimes = Lists.newArrayList();
        this.postCommitStepTimes = Lists.newArrayList();
        this.timeOfLastFlush = Long.valueOf(System.currentTimeMillis());
        this.hasOutstandingFlush = false;
        this.flushPublisher = Optional.absent();
        this.workUnitState = workUnitState;
        this.config = ConfigFactory.parseProperties(workUnitState.getProperties());
        this.flushIntervalMillis = Long.valueOf(ConfigUtils.getLong(this.config, FLUSH_INTERVAL_SECONDS_KEY, DEFAULT_FLUSH_INTERVAL_SECONDS).longValue() * 1000);
        this.watermarkTracker = new LastWatermarkTracker(false);
        this.watermarkStorage = Optional.of(new StateStoreBasedWatermarkStorage(workUnitState));
        this.preCommitSteps = ConfigUtils.getStringList(this.config, GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS);
        this.postCommitSteps = ConfigUtils.getStringList(this.config, GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS);
        Stream<R> map = this.preCommitSteps.stream().map(str -> {
            return new AtomicLong(0L);
        });
        List<AtomicLong> list = this.preCommitStepTimes;
        list.getClass();
        map.forEach((v1) -> {
            r1.add(v1);
        });
        Stream<R> map2 = this.postCommitSteps.stream().map(str2 -> {
            return new AtomicLong(0L);
        });
        List<AtomicLong> list2 = this.postCommitStepTimes;
        list2.getClass();
        map2.forEach((v1) -> {
            r1.add(v1);
        });
        initFlushPublisher();
        MetricContextUtils.registerGauge(getMetricContext(), WATERMARK_COMMIT_TIME_METRIC, this.watermarkCommitTime);
        initCommitStepMetrics(this.preCommitSteps, this.postCommitSteps);
    }

    private void initCommitStepMetrics(List<String>... listArr) {
        for (List<String> list : listArr) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                MetricContextUtils.registerGauge(getMetricContext(), COMMIT_STEP_METRIC_PREFIX + it.next() + ".time", new AtomicLong(0L));
            }
        }
    }

    private StreamEntity<D> generateFlushMessageIfNecessary() {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        if (valueOf.longValue() - this.timeOfLastFlush.longValue() > this.flushIntervalMillis.longValue()) {
            return generateFlushMessage(valueOf);
        }
        return null;
    }

    private StreamEntity<D> generateFlushMessage(Long l) {
        log.debug("Injecting flush control message");
        FlushControlMessage build = FlushControlMessage.builder().flushReason("Timed flush").build();
        FlushAckable flushAckable = new FlushAckable();
        build.addCallBack(flushAckable);
        this.lastFlushAckable = flushAckable;
        this.hasOutstandingFlush = true;
        this.timeOfLastFlush = l;
        return build;
    }

    private void initFlushPublisher() {
        if (this.flushPublisher.isPresent()) {
            return;
        }
        try {
            this.flushPublisher = Optional.of(GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(this.config, FLUSH_DATA_PUBLISHER_CLASS, DEFAULT_FLUSH_DATA_PUBLISHER_CLASS)), new Object[]{this.workUnitState}));
        } catch (ReflectiveOperationException e) {
            log.error("Error in instantiating Data Publisher");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase
    public StreamEntity<D> readStreamEntityImpl() throws DataRecordException, IOException {
        if (this.hasOutstandingFlush) {
            Throwable waitForAck = this.lastFlushAckable.waitForAck();
            if (waitForAck != null) {
                throw new RuntimeException("Error waiting for flush ack", waitForAck);
            }
            this.hasOutstandingFlush = false;
            doCommitSequence(this.preCommitSteps, true);
            publishTaskOutput();
            onFlushAck();
            doCommitSequence(this.postCommitSteps, false);
        }
        StreamEntity<D> generateFlushMessageIfNecessary = generateFlushMessageIfNecessary();
        if (generateFlushMessageIfNecessary != null) {
            return generateFlushMessageIfNecessary;
        }
        RecordEnvelope<D> readRecordEnvelopeImpl = readRecordEnvelopeImpl();
        if (readRecordEnvelopeImpl instanceof FlushRecordEnvelope) {
            return generateFlushMessage(Long.valueOf(System.currentTimeMillis()));
        }
        if (readRecordEnvelopeImpl != null) {
            this.watermarkTracker.unacknowledgedWatermark(readRecordEnvelopeImpl.getWatermark());
        }
        return readRecordEnvelopeImpl;
    }

    public CommitStep initCommitStep(String str, boolean z) throws IOException {
        return null;
    }

    private void doCommitSequence(List<String> list, boolean z) throws IOException {
        for (int i = 0; i < list.size(); i++) {
            long currentTimeMillis = System.currentTimeMillis();
            String str = list.get(i);
            CommitStep commitStep = this.commitStepMap.get(str);
            if (commitStep == null) {
                commitStep = initCommitStep(list.get(i), z);
                this.commitStepMap.put(str, commitStep);
            }
            log.info("Calling commit step: {}", str);
            commitStep.execute();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (z) {
                this.preCommitStepTimes.get(i).set(currentTimeMillis2);
            } else {
                this.postCommitStepTimes.get(i).set(currentTimeMillis2);
            }
        }
    }

    protected void onFlushAck() throws IOException {
        checkPointWatermarks();
    }

    public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class cls, Iterable<String> iterable) {
        Preconditions.checkArgument(CheckpointableWatermark.class.isAssignableFrom(cls), "Watermark class " + cls.toString() + " is not a CheckPointableWatermark class");
        try {
            this.lastCommittedWatermarks = ((WatermarkStorage) this.watermarkStorage.get()).getCommittedWatermarks(cls, iterable);
        } catch (Exception e) {
            log.warn("Failed to get watermarks... will start from the beginning", e);
            this.lastCommittedWatermarks = Collections.EMPTY_MAP;
        }
        return this.lastCommittedWatermarks;
    }

    protected void publishTaskOutput() throws IOException {
        if (!this.flushPublisher.isPresent()) {
            throw new IOException("Publish called without a flush publisher");
        }
        ((DataPublisher) this.flushPublisher.get()).publish(Collections.singletonList(this.workUnitState));
    }

    public void shutdown() {
        if (this.hasOutstandingFlush) {
            this.lastFlushAckable.nack(new IOException("Extractor already shutdown"));
        }
    }

    private void checkPointWatermarks() throws IOException {
        Map<String, CheckpointableWatermark> allUnacknowledgedWatermarks = this.watermarkTracker.getAllUnacknowledgedWatermarks();
        if (!this.watermarkStorage.isPresent()) {
            log.warn("No watermarkStorage found; Skipping checkpointing");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        ((WatermarkStorage) this.watermarkStorage.get()).commitWatermarks(allUnacknowledgedWatermarks.values());
        this.watermarkCommitTime.set(System.currentTimeMillis() - currentTimeMillis);
        for (Map.Entry<String, CheckpointableWatermark> entry : allUnacknowledgedWatermarks.entrySet()) {
            this.lastCommittedWatermarks.put(entry.getKey(), entry.getValue());
        }
    }

    @Override // org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase
    public abstract RecordEnvelope<D> readRecordEnvelopeImpl() throws DataRecordException, IOException;

    public Map<String, CheckpointableWatermark> getLastCommittedWatermarks() {
        return this.lastCommittedWatermarks;
    }

    public void setWatermarkStorage(Optional<WatermarkStorage> optional) {
        this.watermarkStorage = optional;
    }

    public WatermarkTracker getWatermarkTracker() {
        return this.watermarkTracker;
    }
}
