package gobblin.writer;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import gobblin.configuration.State;
import gobblin.instrumented.Instrumentable;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.MetricContext;
import gobblin.metrics.Tag;
import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.util.ConfigUtils;
import gobblin.util.ExecutorsUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/writer/FineGrainedWatermarkTracker.class */
public class FineGrainedWatermarkTracker implements Instrumentable, Closeable {
    public static final String WATERMARK_TRACKER_SWEEP_INTERVAL_MS = "watermark.tracker.sweepIntervalMillis";
    private static final String WATERMARK_TRACKER_STABILITY_CHECK_INTERVAL_MS = "watermark.tracker.stabilityCheckIntervalMillis";
    private static final String WATERMARK_TRACKER_LAG_THRESHOLD = "watermark.tracker.lagThreshold";
    private static final String WATERMARKS_INSERTED_METER = "watermark.tracker.inserted";
    private static final String WATERMARKS_SWEPT_METER = "watermark.tracker.swept";
    private static final long MILLIS_TO_NANOS = 1000000;
    private final long _sweepIntervalMillis;
    private final long _stabilityCheckIntervalMillis;
    private final long _watermarkLagThreshold;
    private ScheduledExecutorService _executorService;
    private final boolean _instrumentationEnabled;
    private MetricContext _metricContext;
    private Meter _watermarksInserted;
    private Meter _watermarksSwept;
    private final AtomicBoolean _started;
    private final AtomicBoolean _abort;
    private final Runnable _sweeper;
    private final Runnable _stabilityChecker;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FineGrainedWatermarkTracker.class);
    public static final Long WATERMARK_TRACKER_SWEEP_INTERVAL_MS_DEFAULT = 100L;
    private static final Long WATERMARK_TRACKER_STABILITY_CHECK_INTERVAL_MS_DEFAULT = 10000L;
    private static final Long WATERMARK_TRACKER_LAG_THRESHOLD_DEFAULT = 100000L;
    private boolean _autoStart = true;
    private final Map<String, Deque<AcknowledgableWatermark>> _watermarksMap = new HashMap();
    protected final Closer _closer = Closer.create();

    public FineGrainedWatermarkTracker(Config config) {
        this._sweepIntervalMillis = ConfigUtils.getLong(config, WATERMARK_TRACKER_SWEEP_INTERVAL_MS, WATERMARK_TRACKER_SWEEP_INTERVAL_MS_DEFAULT).longValue();
        this._stabilityCheckIntervalMillis = ConfigUtils.getLong(config, WATERMARK_TRACKER_STABILITY_CHECK_INTERVAL_MS, WATERMARK_TRACKER_STABILITY_CHECK_INTERVAL_MS_DEFAULT).longValue();
        this._watermarkLagThreshold = ConfigUtils.getLong(config, WATERMARK_TRACKER_LAG_THRESHOLD, WATERMARK_TRACKER_LAG_THRESHOLD_DEFAULT).longValue();
        this._instrumentationEnabled = GobblinMetrics.isEnabled(config);
        this._metricContext = (MetricContext) this._closer.register(Instrumented.getMetricContext(ConfigUtils.configToState(config), getClass()));
        regenerateMetrics();
        this._started = new AtomicBoolean(false);
        this._abort = new AtomicBoolean(false);
        this._sweeper = new Runnable() { // from class: gobblin.writer.FineGrainedWatermarkTracker.1
            @Override // java.lang.Runnable
            public void run() {
                FineGrainedWatermarkTracker.this.sweep();
            }
        };
        this._stabilityChecker = new Runnable() { // from class: gobblin.writer.FineGrainedWatermarkTracker.2
            @Override // java.lang.Runnable
            public void run() {
                FineGrainedWatermarkTracker.this.checkStability();
            }
        };
    }

    @VisibleForTesting
    void setAutoStart(boolean z) {
        this._autoStart = z;
    }

    public void track(AcknowledgableWatermark acknowledgableWatermark) {
        if (!this._started.get() && this._autoStart) {
            start();
        }
        maybeAbort();
        String source = acknowledgableWatermark.getCheckpointableWatermark().getSource();
        Deque<AcknowledgableWatermark> deque = this._watermarksMap.get(source);
        if (deque == null) {
            deque = new ConcurrentLinkedDeque();
            this._watermarksMap.put(source, deque);
        }
        deque.add(acknowledgableWatermark);
        this._watermarksInserted.mark();
    }

    private void maybeAbort() throws RuntimeException {
        if (this._abort.get()) {
            throw new RuntimeException("Aborting Watermark tracking");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkStability() {
        if (this._watermarksInserted.getCount() - this._watermarksSwept.getCount() > this._watermarkLagThreshold) {
            log.error("Setting abort flag for Watermark tracking because the lag between the watermarksInserted: {} and watermarksSwept: {} is greater than the threshold: {}", Long.valueOf(this._watermarksInserted.getCount()), Long.valueOf(this._watermarksSwept.getCount()), Long.valueOf(this._watermarkLagThreshold));
            this._abort.set(true);
        }
    }

    public Map<String, CheckpointableWatermark> getCommittableWatermarks() {
        HashMap hashMap = new HashMap(this._watermarksMap.size());
        for (Map.Entry<String, Deque<AcknowledgableWatermark>> entry : this._watermarksMap.entrySet()) {
            String key = entry.getKey();
            AcknowledgableWatermark acknowledgableWatermark = null;
            for (AcknowledgableWatermark acknowledgableWatermark2 : entry.getValue()) {
                if (!acknowledgableWatermark2.isAcked()) {
                    break;
                }
                acknowledgableWatermark = acknowledgableWatermark2;
            }
            if (acknowledgableWatermark != null) {
                hashMap.put(key, acknowledgableWatermark.getCheckpointableWatermark());
            }
        }
        return hashMap;
    }

    public Map<String, CheckpointableWatermark> getUnacknowledgedWatermarks() {
        HashMap hashMap = new HashMap(this._watermarksMap.size());
        for (Map.Entry<String, Deque<AcknowledgableWatermark>> entry : this._watermarksMap.entrySet()) {
            String key = entry.getKey();
            AcknowledgableWatermark acknowledgableWatermark = null;
            Iterator<T> it = entry.getValue().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                AcknowledgableWatermark acknowledgableWatermark2 = (AcknowledgableWatermark) it.next();
                if (!acknowledgableWatermark2.isAcked()) {
                    acknowledgableWatermark = acknowledgableWatermark2;
                    break;
                }
            }
            if (acknowledgableWatermark != null) {
                hashMap.put(key, acknowledgableWatermark.getCheckpointableWatermark());
            }
        }
        return hashMap;
    }

    public synchronized void start() {
        if (!this._started.get()) {
            this._executorService = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(LoggerFactory.getLogger((Class<?>) FineGrainedWatermarkTracker.class))));
            this._executorService.scheduleAtFixedRate(this._sweeper, 0L, this._sweepIntervalMillis, TimeUnit.MILLISECONDS);
            this._executorService.scheduleAtFixedRate(this._stabilityChecker, 0L, this._stabilityCheckIntervalMillis, TimeUnit.MILLISECONDS);
        }
        this._started.set(true);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this._executorService != null) {
                this._executorService.shutdown();
            }
        } finally {
            this._closer.close();
        }
    }

    @VisibleForTesting
    synchronized int sweep() {
        long nanoTime = System.nanoTime();
        int i = 0;
        Iterator<Map.Entry<String, Deque<AcknowledgableWatermark>>> it = this._watermarksMap.entrySet().iterator();
        while (it.hasNext()) {
            Deque<AcknowledgableWatermark> value = it.next().getValue();
            boolean z = true;
            while (z) {
                Iterator<AcknowledgableWatermark> it2 = value.iterator();
                if (!it2.hasNext()) {
                    z = false;
                } else if (!it2.next().isAcked()) {
                    z = false;
                } else if (it2.hasNext()) {
                    AcknowledgableWatermark next = it2.next();
                    if (next == null || !next.isAcked()) {
                        z = false;
                    } else {
                        value.pop();
                        i++;
                    }
                } else {
                    z = false;
                }
            }
        }
        log.debug("Swept {} watermarks in {} millis", Integer.valueOf(i), Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        this._watermarksSwept.mark(i);
        return i;
    }

    @Override // gobblin.instrumented.Instrumentable
    public void switchMetricContext(List<Tag<?>> list) {
        this._metricContext = (MetricContext) this._closer.register(Instrumented.newContextFromReferenceContext(this._metricContext, list, Optional.absent()));
        regenerateMetrics();
    }

    @Override // gobblin.instrumented.Instrumentable
    public void switchMetricContext(MetricContext metricContext) {
        this._metricContext = metricContext;
        regenerateMetrics();
    }

    @Override // gobblin.instrumented.Instrumentable
    public List<Tag<?>> generateTags(State state) {
        return Lists.newArrayList();
    }

    @Override // gobblin.instrumented.Instrumentable
    @Nonnull
    public MetricContext getMetricContext() {
        return this._metricContext;
    }

    @Override // gobblin.instrumented.Instrumentable
    public boolean isInstrumentationEnabled() {
        return this._instrumentationEnabled;
    }

    protected void regenerateMetrics() {
        this._watermarksInserted = this._metricContext.meter(WATERMARKS_INSERTED_METER);
        this._watermarksSwept = this._metricContext.meter(WATERMARKS_SWEPT_METER);
    }
}
