package org.apache.eagle.policy.siddhi;

import com.lmax.disruptor.ExceptionHandler;
import com.typesafe.config.Config;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.dataproc.core.ValuesArray;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.policy.PolicyEvaluationContext;
import org.apache.eagle.policy.PolicyEvaluator;
import org.apache.eagle.policy.PolicyManager;
import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.policy.config.AbstractPolicyDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.execution.query.Query;
import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;

/* loaded from: input_file:org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.class */
public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T> {
    private static final String EXECUTION_PLAN_NAME = "query";
    private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
    private volatile SiddhiRuntime siddhiRuntime;
    private final String[] sourceStreams;
    private final boolean needValidation;
    private final Config config;
    private final PolicyEvaluationContext<T, K> context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator$SiddhiPolicyExceptionHandler.class */
    public static class SiddhiPolicyExceptionHandler implements Serializable, ExceptionHandler<Object> {
        private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyExceptionHandler.class);

        private SiddhiPolicyExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            LOG.warn("Exception processing event: " + j + " " + obj, th);
        }

        public void handleOnStartException(Throwable th) {
            LOG.warn("Exception during onStart()", th);
        }

        public void handleOnShutdownException(Throwable th) {
            LOG.warn("Exception during onShutdown()", th);
        }
    }

    /* loaded from: input_file:org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator$SiddhiRuntime.class */
    public static class SiddhiRuntime {
        QueryCallback queryCallback;
        Map<String, InputHandler> siddhiInputHandlers;
        SiddhiManager siddhiManager;
        SiddhiPolicyDefinition policyDef;
        List<String> outputFields;
        String executionPlanName;
        boolean markdownEnabled;
        String markdownReason;
    }

    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> policyEvaluationContext, AbstractPolicyDefinition abstractPolicyDefinition, String[] strArr) {
        this(config, policyEvaluationContext, abstractPolicyDefinition, strArr, false);
    }

    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> policyEvaluationContext, AbstractPolicyDefinition abstractPolicyDefinition, String[] strArr, boolean z) {
        this.config = config;
        this.context = policyEvaluationContext;
        this.context.evaluator = this;
        this.needValidation = z;
        this.sourceStreams = strArr;
        init(abstractPolicyDefinition);
    }

    public void init(AbstractPolicyDefinition abstractPolicyDefinition) {
        this.siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) abstractPolicyDefinition);
    }

    public static String addContextFieldIfNotExist(String str) {
        int indexOf = str.indexOf("select ") + 7;
        int i = indexOf;
        boolean z = true;
        while (true) {
            if (i >= str.length()) {
                break;
            }
            if (str.charAt(i) == ' ') {
                i++;
            } else if (str.charAt(i) != '*') {
                z = false;
            }
        }
        if (z) {
            return str;
        }
        return str.substring(0, indexOf) + "eagleAlertContext," + str.substring(indexOf, str.length());
    }

    private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition siddhiPolicyDefinition) {
        SiddhiManager siddhiManager = new SiddhiManager();
        HashMap hashMap = new HashMap();
        SiddhiRuntime siddhiRuntime = new SiddhiRuntime();
        String expression = siddhiPolicyDefinition.getExpression();
        if (!siddhiPolicyDefinition.isContainsDefinition()) {
            StringBuilder sb = new StringBuilder();
            for (String str : this.sourceStreams) {
                String convertToStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef(str);
                LOG.info("Siddhi stream definition : " + convertToStreamDef);
                sb.append(convertToStreamDef);
            }
            expression = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + siddhiPolicyDefinition.getExpression();
        }
        ExecutionPlanRuntime executionPlanRuntime = null;
        try {
            executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(expression);
            executionPlanRuntime.handleExceptionWith(new SiddhiPolicyExceptionHandler());
            for (String str2 : this.sourceStreams) {
                hashMap.put(str2, executionPlanRuntime.getInputHandler(str2));
            }
            executionPlanRuntime.start();
            LOG.info("Siddhi query: " + expression);
            attachCallback(siddhiRuntime, executionPlanRuntime, this.context);
            siddhiRuntime.markdownEnabled = false;
            siddhiRuntime.markdownReason = null;
        } catch (SiddhiParserException e) {
            LOG.error("Exception in parsing Siddhi query: " + expression + ", reason being: " + e.getMessage());
            siddhiRuntime.queryCallback = null;
            siddhiRuntime.outputFields = null;
            siddhiRuntime.markdownEnabled = true;
            siddhiRuntime.markdownReason = e.getMessage();
        }
        siddhiRuntime.siddhiInputHandlers = hashMap;
        siddhiRuntime.siddhiManager = siddhiManager;
        siddhiRuntime.policyDef = siddhiPolicyDefinition;
        siddhiRuntime.executionPlanName = null != executionPlanRuntime ? executionPlanRuntime.getName() : null;
        return siddhiRuntime;
    }

    private void attachCallback(SiddhiRuntime siddhiRuntime, ExecutionPlanRuntime executionPlanRuntime, PolicyEvaluationContext<T, K> policyEvaluationContext) {
        ArrayList arrayList = new ArrayList();
        SiddhiQueryCallbackImpl siddhiQueryCallbackImpl = new SiddhiQueryCallbackImpl(this.config, policyEvaluationContext);
        executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, siddhiQueryCallbackImpl);
        siddhiRuntime.queryCallback = siddhiQueryCallbackImpl;
        try {
            Field declaredField = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
            declaredField.setAccessible(true);
            Iterator it = ((Query) declaredField.get(siddhiQueryCallbackImpl)).getSelector().getSelectionList().iterator();
            while (it.hasNext()) {
                arrayList.add(((OutputAttribute) it.next()).getRename());
            }
        } catch (Exception e) {
            LOG.error("Got an Exception when initial outputFields ", e);
        }
        siddhiRuntime.outputFields = arrayList;
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public void evaluate(ValuesArray valuesArray) throws Exception {
        if (this.siddhiRuntime.markdownEnabled) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Siddhi policy evaluator consumers data :" + valuesArray);
        }
        Collector collector = (Collector) valuesArray.get(0);
        String str = (String) valuesArray.get(1);
        SortedMap sortedMap = (SortedMap) valuesArray.get(2);
        Set<String> keySet = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(str).keySet();
        validateEventInRuntime(str, sortedMap, keySet);
        synchronized (this.siddhiRuntime) {
            this.context.outputCollector = collector;
            ArrayList arrayList = new ArrayList();
            putAttrsIntoInputStream(arrayList, str, keySet, sortedMap);
            this.siddhiRuntime.siddhiInputHandlers.get(str).send(arrayList.toArray(new Object[0]));
        }
    }

    private void validateEventInRuntime(String str, SortedMap sortedMap, Set<String> set) {
        if (this.needValidation && !set.equals(sortedMap.keySet())) {
            TreeSet treeSet = new TreeSet();
            for (K k : sortedMap.keySet()) {
                if (!set.contains(k)) {
                    treeSet.add(k);
                }
            }
            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", treeSet.toString(), sortedMap.toString(), str, set.toString()));
            Iterator it = treeSet.iterator();
            while (it.hasNext()) {
                sortedMap.remove(it.next());
            }
        }
    }

    private void putAttrsIntoInputStream(List<Object> list, String str, Set<String> set, SortedMap sortedMap) {
        if (!this.needValidation) {
            list.addAll(sortedMap.values());
            return;
        }
        for (String str2 : set) {
            Object obj = sortedMap.get(str2);
            if (obj == null) {
                list.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(str, str2));
            } else {
                list.add(obj);
            }
        }
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public void onPolicyUpdate(T t) {
        Object obj = null;
        try {
            obj = (AbstractPolicyDefinition) JsonSerDeserUtils.deserialize(t.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules((String) t.getTags().get(Constants.POLICY_TYPE)));
        } catch (Exception e) {
            LOG.error("Initial policy def error, ", e);
        }
        SiddhiRuntime siddhiRuntime = this.siddhiRuntime;
        this.siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) obj);
        synchronized (siddhiRuntime) {
            if (!siddhiRuntime.markdownEnabled) {
                siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
            }
        }
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public void onPolicyDelete() {
        synchronized (this.siddhiRuntime) {
            LOG.info("Going to shutdown siddhi execution plan, planName: " + this.siddhiRuntime.executionPlanName);
            if (!this.siddhiRuntime.markdownEnabled) {
                this.siddhiRuntime.siddhiManager.getExecutionPlanRuntime(this.siddhiRuntime.executionPlanName).shutdown();
            }
            LOG.info("Siddhi execution plan " + this.siddhiRuntime.executionPlanName + " is successfully shutdown ");
        }
    }

    public String toString() {
        return this.siddhiRuntime.policyDef.toString();
    }

    public String[] getStreamNames() {
        return this.sourceStreams;
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public Map<String, String> getAdditionalContext() {
        HashMap hashMap = new HashMap();
        StringBuilder sb = new StringBuilder();
        for (String str : getStreamNames()) {
            sb.append(str + ",");
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(sb.length() - 1);
        }
        hashMap.put(Constants.SOURCE_STREAMS, sb.toString());
        hashMap.put(Constants.POLICY_ID, this.context.policyId);
        return hashMap;
    }

    public List<String> getOutputStreamAttrNameList() {
        return this.siddhiRuntime.outputFields;
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public boolean isMarkdownEnabled() {
        return this.siddhiRuntime.markdownEnabled;
    }

    @Override // org.apache.eagle.policy.PolicyEvaluator
    public String getMarkdownReason() {
        return this.siddhiRuntime.markdownReason;
    }
}
