package org.apache.gobblin.completeness.verifier;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.gobblin.completeness.audit.AuditCountClient;
import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ClassAliasResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.class */
public class KafkaAuditCountVerifier {
    private static final Logger log = LoggerFactory.getLogger(KafkaAuditCountVerifier.class);
    public static final String COMPLETENESS_PREFIX = "completeness.";
    public static final String SOURCE_TIER = "completeness.source.tier";
    public static final String REFERENCE_TIERS = "completeness.reference.tiers";
    public static final String THRESHOLD = "completeness.threshold";
    private static final double DEFAULT_THRESHOLD = 0.999d;
    public static final String COMPLETE_ON_NO_COUNTS = "completeness.complete.on.no.counts";
    private final boolean returnCompleteOnNoCounts;
    private final AuditCountClient auditCountClient;
    private final String srcTier;
    private final Collection<String> refTiers;
    private final double threshold;

    public KafkaAuditCountVerifier(State state) {
        this(state, getAuditClient(state));
    }

    public KafkaAuditCountVerifier(State state, AuditCountClient auditCountClient) {
        this.auditCountClient = auditCountClient;
        this.threshold = state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
        this.srcTier = state.getProp(SOURCE_TIER);
        this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
        this.returnCompleteOnNoCounts = state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
    }

    private static AuditCountClient getAuditClient(State state) {
        Preconditions.checkArgument(state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY), String.format("Audit count factory %s not set ", AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY));
        try {
            return ((AuditCountClientFactory) new ClassAliasResolver(AuditCountClientFactory.class).resolveClass(state.getProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY)).newInstance()).createAuditCountClient(state);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isComplete(String str, long j, long j2, double d) throws IOException {
        return getCompletenessPercentage(str, j, j2) > d;
    }

    public boolean isComplete(String str, long j, long j2) throws IOException {
        return isComplete(str, j, j2, this.threshold);
    }

    private double getCompletenessPercentage(String str, long j, long j2) throws IOException {
        Map<String, Long> tierAndCount = getTierAndCount(str, j, j2);
        log.info(String.format("Audit counts map for %s for range [%s,%s]", str, Long.valueOf(j), Long.valueOf(j2)));
        tierAndCount.forEach((str2, l) -> {
            log.info(String.format(" %s : %s ", str2, l));
        });
        if (tierAndCount.isEmpty() && this.returnCompleteOnNoCounts) {
            log.info(String.format("Found empty counts map for %s, returning complete", str));
            return 1.0d;
        }
        double d = -1.0d;
        if (!tierAndCount.containsKey(this.srcTier)) {
            throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, str, Long.valueOf(j), Long.valueOf(j2)));
        }
        for (String str3 : this.refTiers) {
            if (!tierAndCount.containsKey(str3)) {
                throw new IOException(String.format("Reference tier %s audit count cannot be retrieved for dataset %s between %s and %s", str3, str, Long.valueOf(j), Long.valueOf(j2)));
            }
            long longValue = tierAndCount.get(str3).longValue();
            if (longValue <= 0) {
                throw new IOException(String.format("Reference tier %s count cannot be less than or equal to zero", str3));
            }
            d = Double.max(d, tierAndCount.get(this.srcTier).longValue() / longValue);
        }
        if (d < 0.0d) {
            throw new IOException("Cannot calculate completion percentage");
        }
        return d;
    }

    private Map<String, Long> getTierAndCount(String str, long j, long j2) throws IOException {
        return this.auditCountClient.fetch(str, j, j2);
    }
}
