package org.apache.gobblin.writer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.writer.WatermarkManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/writer/MultiWriterWatermarkManager.class */
public class MultiWriterWatermarkManager implements WatermarkManager {
    private final Queue<WatermarkAwareWriter> _watermarkAwareWriters;
    private final WatermarkStorage _watermarkStorage;
    private final long _commitIntervalMillis;
    private final ScheduledExecutorService _watermarkCommitThreadPool;
    private final Logger _logger;
    private final WatermarkManager.RetrievalStatus _retrievalStatus;
    private final WatermarkManager.CommitStatus _commitStatus;

    @VisibleForTesting
    final Runnable _watermarkCommitter = new Runnable() { // from class: org.apache.gobblin.writer.MultiWriterWatermarkManager.1
        @Override // java.lang.Runnable
        public void run() {
            long nanoTime = System.nanoTime();
            Map<String, CheckpointableWatermark> map = null;
            try {
                MultiWriterWatermarkManager.this._retrievalStatus.onAttempt();
                MultiWriterWatermarkTracker multiWriterWatermarkTracker = new MultiWriterWatermarkTracker();
                for (WatermarkAwareWriter watermarkAwareWriter : MultiWriterWatermarkManager.this._watermarkAwareWriters) {
                    Map<String, CheckpointableWatermark> committableWatermark = watermarkAwareWriter.getCommittableWatermark();
                    MultiWriterWatermarkManager.this._logger.debug("Retrieved from writer {} : watermark {} ", watermarkAwareWriter.getClass().getName(), committableWatermark);
                    multiWriterWatermarkTracker.committedWatermarks(committableWatermark);
                }
                map = multiWriterWatermarkTracker.getAllCommitableWatermarks();
                MultiWriterWatermarkManager.this._retrievalStatus.onSuccess(map);
            } catch (Exception e) {
                MultiWriterWatermarkManager.this._retrievalStatus.onFailure(e);
                MultiWriterWatermarkManager.this._logger.error("Failed to get watermark", e);
            }
            synchronized (this) {
                if (map != null) {
                    if (!map.isEmpty()) {
                        try {
                            MultiWriterWatermarkManager.this._commitStatus.onAttempt();
                            MultiWriterWatermarkManager.this._logger.info("Will commit watermark {}", map.toString());
                            MultiWriterWatermarkManager.this._watermarkStorage.commitWatermarks(map.values());
                            MultiWriterWatermarkManager.this._commitStatus.onSuccess(map);
                        } catch (Exception e2) {
                            MultiWriterWatermarkManager.this._commitStatus.onFailure(e2, map);
                            MultiWriterWatermarkManager.this._logger.error("Failed to write watermark", e2);
                        }
                    }
                }
                MultiWriterWatermarkManager.this._logger.info("Nothing to commit");
            }
            MultiWriterWatermarkManager.this._logger.info("Duration of run {} milliseconds", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        }
    };

    public MultiWriterWatermarkManager(WatermarkStorage watermarkStorage, long j, Optional<Logger> optional) {
        Preconditions.checkArgument(watermarkStorage != null, "WatermarkStorage cannot be null");
        this._watermarkAwareWriters = new ConcurrentLinkedQueue();
        this._watermarkStorage = watermarkStorage;
        this._commitIntervalMillis = j;
        this._logger = (Logger) optional.or(LoggerFactory.getLogger(MultiWriterWatermarkManager.class));
        this._watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(optional, Optional.of("WatermarkManager-%d")));
        this._retrievalStatus = new WatermarkManager.RetrievalStatus();
        this._commitStatus = new WatermarkManager.CommitStatus();
    }

    public void registerWriter(WatermarkAwareWriter watermarkAwareWriter) {
        this._watermarkAwareWriters.add(watermarkAwareWriter);
        this._logger.info("Registered a watermark aware writer {}", watermarkAwareWriter.getClass().getName());
    }

    @Override // org.apache.gobblin.writer.WatermarkManager
    public void start() {
        this._watermarkCommitThreadPool.scheduleWithFixedDelay(this._watermarkCommitter, 0L, this._commitIntervalMillis, TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._logger.info("Watermark committer closing");
        this._watermarkCommitThreadPool.shutdown();
        try {
            try {
                long nanoTime = System.nanoTime();
                this._watermarkCommitThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
                this._logger.info("Duration of termination wait was {} milliseconds", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
                this._logger.info("Watermark committer: one last commit before shutting down");
                this._watermarkCommitter.run();
            } catch (InterruptedException e) {
                throw new IOException("Interrupted while waiting for committer to shutdown", e);
            }
        } catch (Throwable th) {
            this._logger.info("Watermark committer: one last commit before shutting down");
            this._watermarkCommitter.run();
            throw th;
        }
    }

    @Override // org.apache.gobblin.writer.WatermarkManager
    public WatermarkManager.CommitStatus getCommitStatus() {
        return this._commitStatus;
    }

    @Override // org.apache.gobblin.writer.WatermarkManager
    public WatermarkManager.RetrievalStatus getRetrievalStatus() {
        return this._retrievalStatus;
    }
}
