/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.translate;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Counter;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Histogram;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Timer;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamAccumulatorProvider
implements AccumulatorProvider {
    private static final Logger LOG = LoggerFactory.getLogger(BeamAccumulatorProvider.class);
    private static final String KEY_METRIC_SEPARATOR = "::";
    private final Map<String, Counter> counterMap = new ConcurrentHashMap<String, Counter>();
    private final Map<String, Histogram> histogramMap = new ConcurrentHashMap<String, Histogram>();

    private BeamAccumulatorProvider() {
    }

    public static Factory getFactory() {
        return Factory.get();
    }

    @Override
    public Counter getCounter(String name) {
        throw new UnsupportedOperationException("BeamAccumulatorProvider doesn't support getCounter(String name). Please specify namespace and name.");
    }

    @Override
    public Counter getCounter(String namespace, String name) {
        return this.counterMap.computeIfAbsent(BeamAccumulatorProvider.getMetricsKey(namespace, name), key -> new BeamMetricsCounter(namespace, name));
    }

    @Override
    public Histogram getHistogram(String name) {
        throw new UnsupportedOperationException("BeamAccumulatorProvider doesn't support getHistogram(String name). Please specify namespace and name.");
    }

    @Override
    public Histogram getHistogram(String namespace, String name) {
        return this.histogramMap.computeIfAbsent(BeamAccumulatorProvider.getMetricsKey(namespace, name), key -> new BeamMetricsHistogram(namespace, name));
    }

    @Override
    public Timer getTimer(String name) {
        throw new UnsupportedOperationException("BeamAccumulatorProvider doesn't support getTimer(String name). Please specify namespace and name.");
    }

    private static String getMetricsKey(String namespace, String name) {
        return namespace.concat(KEY_METRIC_SEPARATOR).concat(name);
    }

    private static abstract class BeamMetrics {
        private final String namespace;
        private final String name;

        BeamMetrics(String namespace, String name) {
            this.namespace = namespace;
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public String getNamespace() {
            return this.namespace;
        }
    }

    public static class BeamMetricsHistogram
    extends BeamMetrics
    implements Histogram {
        BeamMetricsHistogram(String namespace, String name) {
            super(namespace, name);
        }

        @Override
        public void add(long value) {
            Metrics.distribution((String)this.getNamespace(), (String)this.getName()).update(value);
        }

        @Override
        public void add(long value, long times) {
            Distribution histogram = Metrics.distribution((String)this.getNamespace(), (String)this.getName());
            for (long i = 0L; i < times; ++i) {
                histogram.update(value);
            }
        }
    }

    public static class BeamMetricsCounter
    extends BeamMetrics
    implements Counter {
        BeamMetricsCounter(String namespace, String name) {
            super(namespace, name);
        }

        @Override
        public void increment(long value) {
            Metrics.counter((String)this.getNamespace(), (String)this.getName()).inc(value);
        }

        @Override
        public void increment() {
            Metrics.counter((String)this.getNamespace(), (String)this.getName()).inc();
        }
    }

    public static class Factory
    implements AccumulatorProvider.Factory {
        private static final Factory INSTANCE = new Factory();
        private static final AccumulatorProvider PROVIDER = new BeamAccumulatorProvider();
        private static final AtomicBoolean isLogged = new AtomicBoolean();

        private Factory() {
        }

        public static Factory get() {
            return INSTANCE;
        }

        @Override
        public AccumulatorProvider create() {
            if (isLogged.compareAndSet(false, true)) {
                LOG.info("Using accumulators with BeamAccumulatorProvider");
            }
            return PROVIDER;
        }
    }
}

