package org.apache.eagle.policy.executor;

import com.codahale.metrics.MetricRegistry;
import com.sun.jersey.client.impl.CopyOnWriteHashMap;
import com.typesafe.config.Config;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.dataproc.core.ValuesArray;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor2;
import org.apache.eagle.metric.reportor.EagleCounterMetric;
import org.apache.eagle.metric.reportor.EagleMetricListener;
import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
import org.apache.eagle.policy.DynamicPolicyLoader;
import org.apache.eagle.policy.PolicyDistributionReportMethods;
import org.apache.eagle.policy.PolicyDistroStatsLogReporter;
import org.apache.eagle.policy.PolicyEvaluationContext;
import org.apache.eagle.policy.PolicyEvaluator;
import org.apache.eagle.policy.PolicyLifecycleMethods;
import org.apache.eagle.policy.PolicyManager;
import org.apache.eagle.policy.PolicyPartitioner;
import org.apache.eagle.policy.ResultRender;
import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.policy.config.AbstractPolicyDefinition;
import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
import org.apache.eagle.policy.siddhi.StreamMetadataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/eagle/policy/executor/PolicyProcessExecutor.class */
public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEntity, K> extends JavaStormStreamExecutor2<String, K> implements PolicyLifecycleMethods<T>, PolicyDistributionReportMethods, IPolicyExecutor<T, K> {
    private static final long serialVersionUID = 1;
    public static final String EAGLE_EVENT_COUNT = "eagle.event.count";
    public static final String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count";
    public static final String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
    public static final String EAGLE_ALERT_COUNT = "eagle.alert.count";
    public static final String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
    private final Class<T> policyDefinitionClz;
    private String executorId;
    private volatile CopyOnWriteHashMap<String, PolicyEvaluator<T>> policyEvaluators;
    private PolicyPartitioner partitioner;
    private int numPartitions;
    private int partitionSeq;
    private Config config;
    private Map<String, Map<String, T>> initialAlertDefs;
    private String[] sourceStreams;
    private Map<String, Map<String, String>> dimensionsMap;
    private Map<String, String> baseDimensions;
    private MetricRegistry registry;
    private EagleMetricListener listener;
    private PolicyDefinitionDAO<T> policyDefinitionDao;
    private static final Logger LOG = LoggerFactory.getLogger(PolicyProcessExecutor.class);
    private static long MERITE_GRANULARITY = 60000;

    public PolicyProcessExecutor(String str, PolicyPartitioner policyPartitioner, int i, int i2, PolicyDefinitionDAO<T> policyDefinitionDAO, String[] strArr, Class<T> cls) {
        this.executorId = str;
        this.partitioner = policyPartitioner;
        this.numPartitions = i;
        this.partitionSeq = i2;
        this.policyDefinitionDao = policyDefinitionDAO;
        this.sourceStreams = strArr;
        this.policyDefinitionClz = cls;
    }

    @Override // org.apache.eagle.policy.executor.IPolicyExecutor
    public String getExecutorId() {
        return this.executorId;
    }

    public int getNumPartitions() {
        return this.numPartitions;
    }

    @Override // org.apache.eagle.policy.executor.IPolicyExecutor
    public int getPartitionSeq() {
        return this.partitionSeq;
    }

    public PolicyPartitioner getPolicyPartitioner() {
        return this.partitioner;
    }

    public Map<String, Map<String, T>> getInitialAlertDefs() {
        return this.initialAlertDefs;
    }

    public PolicyDefinitionDAO<T> getPolicyDefinitionDao() {
        return this.policyDefinitionDao;
    }

    public Map<String, PolicyEvaluator<T>> getPolicyEvaluators() {
        return this.policyEvaluators;
    }

    public void prepareConfig(Config config) {
        this.config = config;
    }

    private void initMetricReportor() {
        String string = this.config.getString("eagleProps.eagleService.host");
        int i = this.config.getInt("eagleProps.eagleService.port");
        String string2 = this.config.hasPath("eagleProps.eagleService.username") ? this.config.getString("eagleProps.eagleService.username") : null;
        String string3 = this.config.hasPath("eagleProps.eagleService.password") ? this.config.getString("eagleProps.eagleService.password") : null;
        this.registry = new MetricRegistry();
        this.listener = new EagleServiceReporterMetricListener(string, i, string2, string3);
        this.baseDimensions = new HashMap();
        this.baseDimensions = new HashMap();
        this.baseDimensions.put(Constants.ALERT_EXECUTOR_ID, this.executorId);
        this.baseDimensions.put(Constants.PARTITIONSEQ, String.valueOf(this.partitionSeq));
        this.baseDimensions.put(Constants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
        this.baseDimensions.put("application", this.config.getString("eagleProps.application"));
        this.baseDimensions.put("site", this.config.getString("eagleProps.site"));
        this.dimensionsMap = new HashMap();
    }

    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config) {
        return new AlertStreamSchemaDAOImpl(config);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void init() {
        StreamMetadataManager.getInstance().init(this.config, getAlertStreamSchemaDAO(this.config));
        HashMap hashMap = new HashMap();
        String string = this.config.getString("eagleProps.site");
        String string2 = this.config.getString("eagleProps.application");
        try {
            this.initialAlertDefs = this.policyDefinitionDao.findActivePoliciesGroupbyExecutorId(string, string2);
            if (this.initialAlertDefs == null || this.initialAlertDefs.isEmpty()) {
                LOG.warn("No alert definitions was found for site: " + string + ", application: " + string2);
            } else if (this.initialAlertDefs.get(this.executorId) != null) {
                for (T t : this.initialAlertDefs.get(this.executorId).values()) {
                    if (this.partitioner.partition(this.numPartitions, (String) t.getTags().get(Constants.POLICY_TYPE), (String) t.getTags().get(Constants.POLICY_ID)) == this.partitionSeq) {
                        hashMap.put(t.getTags().get(Constants.POLICY_ID), createPolicyEvaluator(t));
                    }
                }
            }
            this.policyEvaluators = new CopyOnWriteHashMap<>();
            this.policyEvaluators.putAll(hashMap);
            DynamicPolicyLoader instanceOf = DynamicPolicyLoader.getInstanceOf(this.policyDefinitionClz);
            instanceOf.init(this.initialAlertDefs, this.policyDefinitionDao, this.config);
            String str = this.executorId + "_" + this.partitionSeq;
            instanceOf.addPolicyChangeListener(str, this);
            instanceOf.addPolicyDistributionReporter(str, this);
            LOG.info("Alert Executor created, partitionSeq: " + this.partitionSeq + " , numPartitions: " + this.numPartitions);
            LOG.info("All policy evaluators: " + this.policyEvaluators);
            initMetricReportor();
        } catch (Exception e) {
            LOG.error("fail to initialize initialAlertDefs: ", e);
            throw new IllegalStateException("fail to initialize initialAlertDefs: ", e);
        }
    }

    protected PolicyEvaluator<T> createPolicyEvaluator(T t) {
        String str = (String) t.getTags().get(Constants.POLICY_TYPE);
        Class<? extends PolicyEvaluator> policyEvaluator = PolicyManager.getInstance().getPolicyEvaluator(str);
        if (policyEvaluator == null) {
            String str2 = "No policy evaluator defined for policy type : " + str;
            LOG.error(str2);
            throw new IllegalStateException(str2);
        }
        String str3 = "alertExecutorConfigs." + this.executorId + ".needValidation";
        boolean z = !this.config.hasPath(str3) || this.config.getBoolean(str3);
        try {
            AbstractPolicyDefinition abstractPolicyDefinition = (AbstractPolicyDefinition) JsonSerDeserUtils.deserialize(t.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(str));
            PolicyEvaluationContext policyEvaluationContext = new PolicyEvaluationContext();
            policyEvaluationContext.policyId = (String) t.getTags().get(Constants.POLICY_ID);
            policyEvaluationContext.alertExecutor = this;
            policyEvaluationContext.resultRender = getResultRender();
            PolicyEvaluator<T> newInstance = policyEvaluator.getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, Boolean.TYPE).newInstance(this.config, policyEvaluationContext, abstractPolicyDefinition, this.sourceStreams, Boolean.valueOf(z));
            if (newInstance.isMarkdownEnabled()) {
                updateMarkdownDetails(t, newInstance.isMarkdownEnabled(), newInstance.getMarkdownReason());
            }
            return newInstance;
        } catch (Exception e) {
            LOG.error("Fail creating new policyEvaluator", e);
            LOG.warn("Broken policy definition and stop running : " + t.getPolicyDef());
            throw new IllegalStateException(e);
        }
    }

    private boolean accept(T t) {
        if ((t.getTags().containsKey(Constants.EXECUTOR_ID) ? (String) t.getTags().get(Constants.EXECUTOR_ID) : (String) t.getTags().get(Constants.ALERT_EXECUTOR_ID)).equals(this.executorId)) {
            return this.partitioner.partition(this.numPartitions, (String) t.getTags().get(Constants.POLICY_TYPE), (String) t.getTags().get(Constants.POLICY_ID)) == this.partitionSeq;
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("alertDef does not belong to this alertExecutorId : " + this.executorId + ", alertDef : " + t);
        return false;
    }

    private void updateCounter(String str, Map<String, String> map, double d) {
        long currentTimeMillis = System.currentTimeMillis();
        String codeMetricKey = MetricKeyCodeDecoder.codeMetricKey(str, map);
        if (this.registry.getMetrics().get(codeMetricKey) != null) {
            ((EagleCounterMetric) this.registry.getMetrics().get(codeMetricKey)).update(d, currentTimeMillis);
            return;
        }
        EagleCounterMetric eagleCounterMetric = new EagleCounterMetric(currentTimeMillis, codeMetricKey, d, MERITE_GRANULARITY);
        eagleCounterMetric.registerListener(this.listener);
        this.registry.register(codeMetricKey, eagleCounterMetric);
    }

    private void updateCounter(String str, Map<String, String> map) {
        updateCounter(str, map, 1.0d);
    }

    protected Map<String, String> getDimensions(String str) {
        if (this.dimensionsMap.get(str) == null) {
            HashMap hashMap = new HashMap(this.baseDimensions);
            hashMap.put(Constants.POLICY_ID, str);
            this.dimensionsMap.put(str, hashMap);
        }
        return this.dimensionsMap.get(str);
    }

    public void flatMap(List<Object> list, Collector<Tuple2<String, K>> collector) {
        if (list.size() != 3) {
            throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Msg is coming " + list.get(2));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Current policyEvaluators: " + this.policyEvaluators.keySet().toString());
        }
        updateCounter(EAGLE_EVENT_COUNT, this.baseDimensions);
        try {
            synchronized (this.policyEvaluators) {
                for (Map.Entry entry : this.policyEvaluators.entrySet()) {
                    String str = (String) entry.getKey();
                    PolicyEvaluator policyEvaluator = (PolicyEvaluator) entry.getValue();
                    if (!policyEvaluator.isMarkdownEnabled()) {
                        updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(str));
                        try {
                            policyEvaluator.evaluate(new ValuesArray(new Object[]{collector, list.get(1), list.get(2)}));
                        } catch (Exception e) {
                            LOG.error("Got an exception, but continue to run " + list.get(2).toString(), e);
                            updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(str));
                        }
                    }
                }
            }
        } catch (Exception e2) {
            LOG.error(this.executorId + ", partition " + this.partitionSeq + ", error fetching alerts, but continue to run", e2);
            updateCounter(EAGLE_ALERT_FAIL_COUNT, this.baseDimensions);
        }
    }

    @Override // org.apache.eagle.policy.PolicyLifecycleMethods
    public void onPolicyCreated(Map<String, T> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.executorId + ", partition " + this.partitionSeq + " policy added : " + map + " policyEvaluators " + this.policyEvaluators);
        }
        for (T t : map.values()) {
            if (accept(t)) {
                LOG.info(this.executorId + ", partition " + this.partitionSeq + " policy really added " + t);
                PolicyEvaluator<T> createPolicyEvaluator = createPolicyEvaluator(t);
                if (createPolicyEvaluator != null) {
                    synchronized (this.policyEvaluators) {
                        this.policyEvaluators.put(t.getTags().get(Constants.POLICY_ID), createPolicyEvaluator);
                    }
                } else {
                    continue;
                }
            }
        }
    }

    @Override // org.apache.eagle.policy.PolicyLifecycleMethods
    public void onPolicyChanged(Map<String, T> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.executorId + ", partition " + this.partitionSeq + " policy changed : " + map);
        }
        for (T t : map.values()) {
            if (accept(t)) {
                LOG.info(this.executorId + ", partition " + this.partitionSeq + " policy really changed " + t);
                synchronized (this.policyEvaluators) {
                    PolicyEvaluator policyEvaluator = (PolicyEvaluator) this.policyEvaluators.get(t.getTags().get(Constants.POLICY_ID));
                    boolean isMarkdownEnabled = policyEvaluator.isMarkdownEnabled();
                    String markdownReason = policyEvaluator.getMarkdownReason();
                    policyEvaluator.onPolicyUpdate(t);
                    if (isMarkdownUpdateRequired(isMarkdownEnabled, policyEvaluator.isMarkdownEnabled(), markdownReason, policyEvaluator.getMarkdownReason())) {
                        updateMarkdownDetails(t, policyEvaluator.isMarkdownEnabled(), policyEvaluator.getMarkdownReason());
                    }
                }
            }
        }
    }

    @Override // org.apache.eagle.policy.PolicyLifecycleMethods
    public void onPolicyDeleted(Map<String, T> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.executorId + ", partition " + this.partitionSeq + " policy deleted : " + map);
        }
        for (T t : map.values()) {
            if (accept(t)) {
                LOG.info(this.executorId + ", partition " + this.partitionSeq + " policy really deleted " + t);
                String str = (String) t.getTags().get(Constants.POLICY_ID);
                synchronized (this.policyEvaluators) {
                    if (this.policyEvaluators.containsKey(str)) {
                        ((PolicyEvaluator) this.policyEvaluators.remove(t.getTags().get(Constants.POLICY_ID))).onPolicyDelete();
                    }
                }
            }
        }
    }

    @Override // org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler
    public void onEvalEvents(PolicyEvaluationContext<T, K> policyEvaluationContext, List<K> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        String str = policyEvaluationContext.policyId;
        LOG.info(String.format("Detected %d alerts for policy %s", Integer.valueOf(list.size()), str));
        Collector collector = policyEvaluationContext.outputCollector;
        PolicyEvaluator<T> policyEvaluator = policyEvaluationContext.evaluator;
        updateCounter(EAGLE_ALERT_COUNT, getDimensions(str), list.size());
        for (K k : list) {
            synchronized (this) {
                collector.collect(new Tuple2(str, k));
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("A new alert is triggered: " + this.executorId + ", partition " + this.partitionSeq + ", Got an alert with output context: " + k + ", for policy " + policyEvaluator);
            }
        }
    }

    public abstract ResultRender<T, K> getResultRender();

    @Override // org.apache.eagle.policy.PolicyDistributionReportMethods
    public void report() {
        new PolicyDistroStatsLogReporter().reportPolicyMembership(this.executorId + "_" + this.partitionSeq, this.policyEvaluators.keySet());
    }

    private boolean isMarkdownUpdateRequired(boolean z, boolean z2, String str, String str2) {
        boolean z3 = true;
        if (!z && !z2) {
            z3 = false;
        } else if (z && z2 && str.equals(str2)) {
            z3 = false;
        }
        return z3;
    }

    private void updateMarkdownDetails(T t, boolean z, String str) {
        AlertDefinitionAPIEntity alertDefinitionAPIEntity = (AlertDefinitionAPIEntity) t;
        alertDefinitionAPIEntity.setMarkdownEnabled(z);
        alertDefinitionAPIEntity.setMarkdownReason(null != str ? str : "");
        this.policyDefinitionDao.updatePolicyDetails(t);
    }
}
