/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class JobManagerWatermarkTracker
extends WatermarkTracker {
    private GlobalAggregateManager aggregateManager;
    private final String aggregateName;
    private final WatermarkAggregateFunction aggregateFunction = new WatermarkAggregateFunction();
    private final long logAccumulatorIntervalMillis;
    private long updateTimeoutCount;

    public JobManagerWatermarkTracker(String aggregateName) {
        this(aggregateName, -1L);
    }

    public JobManagerWatermarkTracker(String aggregateName, long logAccumulatorIntervalMillis) {
        this.aggregateName = aggregateName;
        this.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis;
    }

    @Override
    public long updateWatermark(long localWatermark) {
        WatermarkUpdate update = new WatermarkUpdate();
        update.id = this.getSubtaskId();
        update.watermark = localWatermark;
        try {
            byte[] resultBytes = (byte[])this.aggregateManager.updateGlobalAggregate(this.aggregateName, (Object)InstantiationUtil.serializeObject((Object)update), (AggregateFunction)this.aggregateFunction);
            WatermarkResult result = (WatermarkResult)InstantiationUtil.deserializeObject((byte[])resultBytes, (ClassLoader)this.getClass().getClassLoader());
            this.updateTimeoutCount += result.updateTimeoutCount;
            return result.watermark;
        }
        catch (IOException | ClassNotFoundException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void open(RuntimeContext context) {
        super.open(context);
        this.aggregateFunction.updateTimeoutMillis = super.getUpdateTimeoutMillis();
        this.aggregateFunction.logAccumulatorIntervalMillis = this.logAccumulatorIntervalMillis;
        Preconditions.checkArgument((boolean)(context instanceof StreamingRuntimeContext));
        StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)context;
        this.aggregateManager = runtimeContext.getGlobalAggregateManager();
    }

    @Override
    public long getUpdateTimeoutCount() {
        return this.updateTimeoutCount;
    }

    private static class WatermarkAggregateFunction
    implements AggregateFunction<byte[], Map<String, WatermarkTracker.WatermarkState>, byte[]> {
        private long updateTimeoutMillis = 60000L;
        private long logAccumulatorIntervalMillis = -1L;
        static long addCount;
        static long lastLogged;
        private static final Logger LOG;

        private WatermarkAggregateFunction() {
        }

        public Map<String, WatermarkTracker.WatermarkState> createAccumulator() {
            return new HashMap<String, WatermarkTracker.WatermarkState>();
        }

        public Map<String, WatermarkTracker.WatermarkState> add(byte[] valueBytes, Map<String, WatermarkTracker.WatermarkState> accumulator) {
            WatermarkUpdate value;
            ++addCount;
            try {
                value = (WatermarkUpdate)InstantiationUtil.deserializeObject((byte[])valueBytes, (ClassLoader)this.getClass().getClassLoader());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            WatermarkTracker.WatermarkState ws = accumulator.get(value.id);
            if (ws == null) {
                ws = new WatermarkTracker.WatermarkState();
                accumulator.put(value.id, ws);
            }
            ws.watermark = value.watermark;
            ws.lastUpdated = System.currentTimeMillis();
            return accumulator;
        }

        public byte[] getResult(Map<String, WatermarkTracker.WatermarkState> accumulator) {
            long updateTimeoutCount = 0L;
            long currentTime = System.currentTimeMillis();
            long globalWatermark = Long.MAX_VALUE;
            for (Map.Entry<String, WatermarkTracker.WatermarkState> e : accumulator.entrySet()) {
                WatermarkTracker.WatermarkState ws = e.getValue();
                if (ws.lastUpdated + this.updateTimeoutMillis < currentTime) {
                    if (ws.watermark >= Long.MAX_VALUE) continue;
                    ++updateTimeoutCount;
                    continue;
                }
                globalWatermark = Math.min(ws.watermark, globalWatermark);
            }
            WatermarkResult result = new WatermarkResult();
            result.watermark = globalWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : globalWatermark;
            result.updateTimeoutCount = updateTimeoutCount;
            if (this.logAccumulatorIntervalMillis > 0L && currentTime - lastLogged > this.logAccumulatorIntervalMillis) {
                lastLogged = System.currentTimeMillis();
                LOG.info("WatermarkAggregateFunction added: {}, timeout: {}, map: {}", new Object[]{addCount, updateTimeoutCount, accumulator});
            }
            try {
                return InstantiationUtil.serializeObject((Object)result);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public Map<String, WatermarkTracker.WatermarkState> merge(Map<String, WatermarkTracker.WatermarkState> accumulatorA, Map<String, WatermarkTracker.WatermarkState> accumulatorB) {
            throw new UnsupportedOperationException();
        }

        static {
            LOG = LoggerFactory.getLogger(WatermarkAggregateFunction.class);
        }
    }

    protected static class WatermarkResult
    implements Serializable {
        protected long watermark = Long.MIN_VALUE;
        protected long updateTimeoutCount = 0L;

        protected WatermarkResult() {
        }
    }

    protected static class WatermarkUpdate
    implements Serializable {
        protected long watermark = Long.MIN_VALUE;
        protected String id;

        protected WatermarkUpdate() {
        }
    }
}

