/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals.metrics;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.List;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvgKafkaMetricAccumulator
extends DefaultKafkaMetricAccumulator {
    private static final Logger LOG = LoggerFactory.getLogger(AvgKafkaMetricAccumulator.class);
    private AvgSumCount lastSumCount;

    public AvgKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
        super(kafkaMetric);
    }

    @Override
    public void merge(Accumulator<Void, Double> other) {
        AvgSumCount otherAvg;
        AvgSumCount thisAvg;
        if (!(other instanceof AvgKafkaMetricAccumulator)) {
            throw new RuntimeException("Trying to merge incompatible accumulators: " + this + " with " + other);
        }
        AvgKafkaMetricAccumulator otherMetric = (AvgKafkaMetricAccumulator)other;
        if (this.lastSumCount == null) {
            Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
            if (!(thisMeasurable instanceof Avg)) {
                throw new RuntimeException("Must be of type Avg");
            }
            thisAvg = AvgKafkaMetricAccumulator.getAvgSumCount((Avg)thisMeasurable);
        } else {
            thisAvg = this.lastSumCount;
        }
        if (otherMetric.lastSumCount == null) {
            Measurable otherMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(otherMetric.kafkaMetric);
            if (!(otherMeasurable instanceof Avg)) {
                throw new RuntimeException("Must be of type Avg");
            }
            otherAvg = AvgKafkaMetricAccumulator.getAvgSumCount((Avg)otherMeasurable);
        } else {
            otherAvg = otherMetric.lastSumCount;
        }
        thisAvg.count += otherAvg.count;
        thisAvg.sum += otherAvg.sum;
        this.mergedValue = thisAvg.sum / (double)thisAvg.count;
    }

    @Override
    public Accumulator<Void, Double> clone() {
        AvgKafkaMetricAccumulator clone = new AvgKafkaMetricAccumulator(this.kafkaMetric);
        clone.lastSumCount = this.lastSumCount;
        clone.isMerged = this.isMerged;
        clone.mergedValue = this.mergedValue;
        return clone;
    }

    private static AvgSumCount getAvgSumCount(Avg avg) {
        try {
            Field samplesField = SampledStat.class.getDeclaredField("samples");
            Field sampleValue = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("value");
            Field sampleEventCount = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("eventCount");
            samplesField.setAccessible(true);
            sampleValue.setAccessible(true);
            sampleEventCount.setAccessible(true);
            List samples = (List)samplesField.get(avg);
            AvgSumCount res = new AvgSumCount();
            for (int i = 0; i < samples.size(); ++i) {
                res.sum += ((Double)sampleValue.get(samples.get(i))).doubleValue();
                res.count += ((Long)sampleEventCount.get(samples.get(i))).longValue();
            }
            return res;
        }
        catch (Throwable t) {
            throw new RuntimeException("Unable to extract sum and count from Avg using reflection. You can turn off the metrics from Flink's Kafka connector if this issue persists.", t);
        }
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
        if (!(thisMeasurable instanceof Avg)) {
            throw new RuntimeException("Must be of type Avg");
        }
        this.lastSumCount = AvgKafkaMetricAccumulator.getAvgSumCount((Avg)thisMeasurable);
        out.defaultWriteObject();
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
    }

    private static class AvgSumCount
    implements Serializable {
        double sum;
        long count;

        private AvgSumCount() {
        }

        public String toString() {
            return "AvgSumCount{sum=" + this.sum + ", count=" + this.count + ", avg=" + this.sum / (double)this.count + "}";
        }
    }
}

