/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.web.metrics;

import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geaflow.cluster.clustermanager.AbstractClusterManager;
import org.apache.geaflow.cluster.clustermanager.IClusterManager;
import org.apache.geaflow.cluster.rpc.RpcClient;
import org.apache.geaflow.cluster.rpc.RpcEndpointRef;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.encoder.RpcMessageEncoder;
import org.apache.geaflow.rpc.proto.Metrics;
import org.apache.geaflow.stats.model.MetricCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricFetcher
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetricFetcher.class);
    private static final int DEFAULT_TIMEOUT = 10000;
    private final Map<Integer, String> driverIds;
    private final MetricCache metricCache;
    private final int updateIntervalMs;
    private long lastUpdateTime;

    public MetricFetcher(Configuration configuration, IClusterManager clusterManager, MetricCache metricCache) {
        this.driverIds = ((AbstractClusterManager)clusterManager).getDriverIds();
        this.metricCache = metricCache;
        this.updateIntervalMs = 10000;
        RpcClient.init(configuration);
    }

    public synchronized void update() {
        long currentTime = System.currentTimeMillis();
        if (this.lastUpdateTime + (long)this.updateIntervalMs <= currentTime) {
            this.lastUpdateTime = currentTime;
            this.fetch();
        }
    }

    private void fetch() {
        Metrics.MetricQueryRequest request = Metrics.MetricQueryRequest.newBuilder().build();
        HashMap<String, Future<Metrics.MetricQueryResponse>> futureList = new HashMap<String, Future<Metrics.MetricQueryResponse>>();
        final MetricCache newMetricCache = new MetricCache();
        final AtomicInteger count = new AtomicInteger(this.driverIds.values().size());
        for (final String driverId : this.driverIds.values()) {
            Future<Metrics.MetricQueryResponse> responseFuture = RpcClient.getInstance().requestMetrics(driverId, request, new RpcEndpointRef.RpcCallback<Metrics.MetricQueryResponse>(){

                @Override
                public void onSuccess(Metrics.MetricQueryResponse value) {
                    MetricCache cache = (MetricCache)RpcMessageEncoder.decode((ByteString)value.getPayload());
                    newMetricCache.mergeMetricCache(cache);
                    if (count.decrementAndGet() == 0) {
                        MetricFetcher.this.metricCache.clearAll();
                        MetricFetcher.this.metricCache.mergeMetricCache(newMetricCache);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    LOGGER.warn("fail to fetch metric from " + driverId, t);
                }
            });
            futureList.put(driverId, responseFuture);
        }
    }
}

