package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.spark.SparkEnv;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaBatchInfo;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Iterator;
import scala.reflect.ClassManifestFactory;
import scala.reflect.ClassTag;

/* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder.class */
public class GlobalWatermarkHolder {
    private static final Logger LOG = LoggerFactory.getLogger(GlobalWatermarkHolder.class);
    private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap();
    private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
    private static final ClassTag<Map> WATERMARKS_TAG = ClassManifestFactory.fromClass(Map.class);
    private static volatile Map<Integer, SparkWatermarks> driverNodeWatermarks = null;
    private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;
    private static volatile long lastWatermarkedBatchTime = 0;

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$SparkWatermarks.class */
    public static class SparkWatermarks implements Serializable {
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(Instant instant, Instant instant2, Instant instant3) {
            this.lowWatermark = instant;
            this.highWatermark = instant2;
            this.synchronizedProcessingTime = instant3;
        }

        public Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public Instant getHighWatermark() {
            return this.highWatermark;
        }

        public Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$WatermarkAdvancingStreamingListener.class */
    public static class WatermarkAdvancingStreamingListener extends JavaStreamingListener {
        private static final Logger LOG = LoggerFactory.getLogger(WatermarkAdvancingStreamingListener.class);

        private long timeOf(JavaBatchInfo javaBatchInfo) {
            return javaBatchInfo.batchTime().milliseconds();
        }

        private long laterOf(long j, long j2) {
            return Math.max(j, j2);
        }

        public void onBatchCompleted(JavaStreamingListenerBatchCompleted javaStreamingListenerBatchCompleted) {
            long timeOf = timeOf(javaStreamingListenerBatchCompleted.batchInfo());
            GlobalWatermarkHolder.advance(Long.toString(timeOf));
            long unused = GlobalWatermarkHolder.lastWatermarkedBatchTime = laterOf(GlobalWatermarkHolder.lastWatermarkedBatchTime, timeOf);
            LOG.info("Batch with timestamp: {} has completed, watermarks have been updated.", Long.valueOf(GlobalWatermarkHolder.lastWatermarkedBatchTime));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$WatermarksLoader.class */
    public static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
        private WatermarksLoader() {
        }

        public Map<Integer, SparkWatermarks> load(@Nonnull String str) throws Exception {
            Map<Integer, SparkWatermarks> fetchSparkWatermarks = GlobalWatermarkHolder.fetchSparkWatermarks(SparkEnv.get().blockManager());
            return fetchSparkWatermarks != null ? fetchSparkWatermarks : Maps.newHashMap();
        }
    }

    public static void add(int i, SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> queue = sourceTimes.get(Integer.valueOf(i));
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
        }
        queue.offer(sparkWatermarks);
        sourceTimes.put(Integer.valueOf(i), queue);
    }

    @VisibleForTesting
    public static void addAll(Map<Integer, Queue<SparkWatermarks>> map) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            Queue<SparkWatermarks> value = entry.getValue();
            while (!value.isEmpty()) {
                add(intValue, value.poll());
            }
        }
    }

    public static long getLastWatermarkedBatchTime() {
        return lastWatermarkedBatchTime;
    }

    public static Map<Integer, SparkWatermarks> get(Long l) {
        if (canBypassRemoteWatermarkFetching()) {
            return getLocalWatermarkCopy();
        }
        if (watermarkCache == null) {
            watermarkCache = createWatermarkCache(l);
        }
        try {
            return (Map) watermarkCache.get("SINGLETON");
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static boolean canBypassRemoteWatermarkFetching() {
        return driverNodeWatermarks != null;
    }

    private static synchronized LoadingCache<String, Map<Integer, SparkWatermarks>> createWatermarkCache(Long l) {
        return CacheBuilder.newBuilder().expireAfterWrite(l.longValue() / 2, TimeUnit.MILLISECONDS).build(new WatermarksLoader());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void advance(String str) {
        synchronized (GlobalWatermarkHolder.class) {
            BlockManager blockManager = SparkEnv.get().blockManager();
            Map<Integer, SparkWatermarks> computeNewWatermarks = computeNewWatermarks(blockManager);
            if (computeNewWatermarks.isEmpty()) {
                LOG.info("No new watermarks could be computed upon completion of batch: {}", str);
            } else {
                writeRemoteWatermarkBlock(computeNewWatermarks, blockManager);
                writeLocalWatermarkCopy(computeNewWatermarks);
            }
        }
    }

    private static void writeLocalWatermarkCopy(Map<Integer, SparkWatermarks> map) {
        driverNodeWatermarks = map;
    }

    private static Map<Integer, SparkWatermarks> getLocalWatermarkCopy() {
        return driverNodeWatermarks;
    }

    public static void advance() {
        advance("N/A");
    }

    private static Map<Integer, SparkWatermarks> computeNewWatermarks(BlockManager blockManager) {
        if (sourceTimes.isEmpty()) {
            return new HashMap();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : sourceTimes.entrySet()) {
            if (!entry.getValue().isEmpty()) {
                Integer key = entry.getKey();
                Instant instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant instant2 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant instant3 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Map<Integer, SparkWatermarks> initWatermarks = initWatermarks(blockManager);
                if (initWatermarks.containsKey(key)) {
                    SparkWatermarks sparkWatermarks = initWatermarks.get(key);
                    instant = sparkWatermarks.getLowWatermark();
                    instant2 = sparkWatermarks.getHighWatermark();
                    instant3 = sparkWatermarks.getSynchronizedProcessingTime();
                }
                SparkWatermarks poll = entry.getValue().poll();
                Instant lowWatermark = poll.getLowWatermark().isAfter(instant) ? poll.getLowWatermark() : instant;
                Instant highWatermark = poll.getHighWatermark().isAfter(instant2) ? poll.getHighWatermark() : instant2;
                Instant synchronizedProcessingTime = poll.getSynchronizedProcessingTime();
                Preconditions.checkState(!lowWatermark.isAfter(highWatermark), String.format("Low watermark %s cannot be later then high watermark %s", lowWatermark, highWatermark));
                Preconditions.checkState(synchronizedProcessingTime.isAfter(instant3), "Synchronized processing time must advance.");
                hashMap.put(key, new SparkWatermarks(lowWatermark, highWatermark, synchronizedProcessingTime));
            }
        }
        return hashMap;
    }

    private static void writeRemoteWatermarkBlock(Map<Integer, SparkWatermarks> map, BlockManager blockManager) {
        blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
        blockManager.putSingle(WATERMARKS_BLOCK_ID, map, StorageLevel.MEMORY_ONLY(), true, WATERMARKS_TAG);
        LOG.info("Put new watermark block: {}", map);
    }

    private static Map<Integer, SparkWatermarks> initWatermarks(BlockManager blockManager) {
        Map<Integer, SparkWatermarks> fetchSparkWatermarks = fetchSparkWatermarks(blockManager);
        if (fetchSparkWatermarks != null) {
            return fetchSparkWatermarks;
        }
        HashMap newHashMap = Maps.newHashMap();
        blockManager.putSingle(WATERMARKS_BLOCK_ID, newHashMap, StorageLevel.MEMORY_ONLY(), true, WATERMARKS_TAG);
        return newHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<Integer, SparkWatermarks> fetchSparkWatermarks(BlockManager blockManager) {
        Option option = blockManager.get(WATERMARKS_BLOCK_ID, WATERMARKS_TAG);
        if (!option.isDefined()) {
            return null;
        }
        Iterator data = ((BlockResult) option.get()).data();
        Map<Integer, SparkWatermarks> map = (Map) data.next();
        do {
        } while (data.hasNext());
        return map;
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        lastWatermarkedBatchTime = 0L;
        writeLocalWatermarkCopy(null);
        SparkEnv sparkEnv = SparkEnv.get();
        if (sparkEnv != null) {
            sparkEnv.blockManager().removeBlock(WATERMARKS_BLOCK_ID, true);
        }
    }
}
