package org.apache.eagle.dataproc.impl.aggregate;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.dataproc.core.ValuesArray;
import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor2;
import org.apache.eagle.policy.PolicyEvaluationContext;
import org.apache.eagle.policy.PolicyEvaluator;
import org.apache.eagle.policy.PolicyManager;
import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.policy.config.AbstractPolicyDefinition;
import org.apache.eagle.policy.executor.IPolicyExecutor;
import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.jasper.compiler.TagConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.class */
public class SimpleAggregateExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleAggregateExecutor.class);
    private final String cql;
    private final int partitionSeq;
    private final int totalPartitionNum;
    private final String[] upStreamNames;
    private String policyId;
    private String executorId;
    private Config config;
    private AggregateDefinitionAPIEntity aggDef = new AggregateDefinitionAPIEntity();
    private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;

    public SimpleAggregateExecutor(String[] strArr, String str, String str2, int i, int i2) {
        this.cql = str;
        this.partitionSeq = i;
        this.upStreamNames = strArr;
        this.totalPartitionNum = i2;
        this.aggDef.setTags(new HashMap());
        this.aggDef.getTags().put(Constants.POLICY_TYPE, str2);
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("type", "siddhiCEPEngine");
            hashMap.put(TagConstants.EXPRESSION_ACTION, this.cql);
            hashMap.put("containsDefinition", true);
            this.aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(hashMap));
        } catch (Exception e) {
            LOG.error("Simple aggregate generate policy definition failed!", e);
        }
        this.aggDef.setCreatedTime(new Date().getTime());
        this.aggDef.setLastModifiedDate(new Date().getTime());
        this.aggDef.setName("anonymous-aggregation-def");
        this.aggDef.setOwner("anonymous");
        this.aggDef.setEnabled(true);
        this.aggDef.setDescription("anonymous aggregation definition");
        String mD5AsHex = MD5Hash.getMD5AsHex(str.getBytes());
        this.policyId = "anonymousAggregatePolicyId-" + mD5AsHex;
        this.executorId = "anonymousAggregateId-" + mD5AsHex;
    }

    @Override // org.apache.eagle.datastream.JavaStormStreamExecutor
    public void prepareConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.eagle.datastream.JavaStormStreamExecutor
    public void init() {
        this.evaluator = createPolicyEvaluator(this.aggDef);
    }

    protected PolicyEvaluator<AggregateDefinitionAPIEntity> createPolicyEvaluator(AggregateDefinitionAPIEntity aggregateDefinitionAPIEntity) {
        String str = aggregateDefinitionAPIEntity.getTags().get(Constants.POLICY_TYPE);
        Class<? extends PolicyEvaluator> policyEvaluator = PolicyManager.getInstance().getPolicyEvaluator(str);
        if (policyEvaluator == null) {
            String str2 = "No policy evaluator defined for policy type : " + str;
            LOG.error(str2);
            throw new IllegalStateException(str2);
        }
        AbstractPolicyDefinition abstractPolicyDefinition = null;
        try {
            abstractPolicyDefinition = (AbstractPolicyDefinition) JsonSerDeserUtils.deserialize(aggregateDefinitionAPIEntity.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(str));
        } catch (Exception e) {
            LOG.error("Fail initial alert policy def: " + aggregateDefinitionAPIEntity.getPolicyDef(), e);
        }
        PolicyEvaluationContext policyEvaluationContext = new PolicyEvaluationContext();
        policyEvaluationContext.policyId = aggregateDefinitionAPIEntity.getTags().get(Constants.POLICY_ID);
        policyEvaluationContext.alertExecutor = this;
        policyEvaluationContext.resultRender = new AggregateResultRender();
        try {
            return policyEvaluator.getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, Boolean.TYPE).newInstance(this.config, policyEvaluationContext, abstractPolicyDefinition, this.upStreamNames, false);
        } catch (Exception e2) {
            LOG.error("Fail creating new policyEvaluator", e2);
            LOG.warn("Broken policy definition and stop running : " + aggregateDefinitionAPIEntity.getPolicyDef());
            throw new IllegalStateException(e2);
        }
    }

    @Override // org.apache.eagle.datastream.JavaStormStreamExecutor
    public void flatMap(List<Object> list, Collector<Tuple2<String, AggregateEntity>> collector) {
        if (list.size() != 3) {
            throw new IllegalStateException("AggregateExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Msg is coming " + list.get(2));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Current policyEvaluators: " + this.evaluator);
        }
        try {
            this.evaluator.evaluate(new ValuesArray(collector, list.get(1), list.get(2)));
        } catch (Exception e) {
            LOG.error("Got an exception, but continue to run " + list.get(2).toString(), e);
        }
    }

    @Override // org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler
    public void onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> policyEvaluationContext, List<AggregateEntity> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        String str = policyEvaluationContext.policyId;
        LOG.info(String.format("Detected %d alerts for policy %s", Integer.valueOf(list.size()), str));
        Collector collector = policyEvaluationContext.outputCollector;
        PolicyEvaluator<AggregateDefinitionAPIEntity> policyEvaluator = policyEvaluationContext.evaluator;
        for (AggregateEntity aggregateEntity : list) {
            synchronized (this) {
                collector.collect(new Tuple2(str, aggregateEntity));
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("A new alert is triggered: " + this.executorId + ", partition " + this.partitionSeq + ", Got an alert with output context: " + aggregateEntity + ", for policy " + policyEvaluator);
            }
        }
    }

    @Override // org.apache.eagle.policy.executor.IPolicyExecutor
    public String getExecutorId() {
        return this.executorId;
    }

    @Override // org.apache.eagle.policy.executor.IPolicyExecutor
    public int getPartitionSeq() {
        return this.partitionSeq;
    }
}
