package org.apache.eagle.notification.plugin;

import com.typesafe.config.Config;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.notification.base.NotificationConstants;
import org.apache.eagle.notification.base.NotificationStatus;
import org.apache.eagle.notification.utils.NotificationPluginUtils;
import org.apache.eagle.policy.common.Constants;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eagle/notification/plugin/AlertKafkaPlugin.class */
public class AlertKafkaPlugin implements NotificationPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
    private List<NotificationStatus> statusList = new Vector();
    private Map<String, List<Map<String, String>>> kafaConfigs = new ConcurrentHashMap();
    private Config config;

    @Override // org.apache.eagle.notification.plugin.NotificationPlugin
    public void init(Config config, List<AlertDefinitionAPIEntity> list) throws Exception {
        this.config = config;
        for (AlertDefinitionAPIEntity alertDefinitionAPIEntity : list) {
            update(alertDefinitionAPIEntity.getTags().get(Constants.POLICY_ID), NotificationPluginUtils.deserializeNotificationConfig(alertDefinitionAPIEntity.getNotificationDef()), false);
        }
    }

    @Override // org.apache.eagle.notification.plugin.NotificationPlugin
    public void update(String str, List<Map<String, String>> list, boolean z) throws Exception {
        if (z) {
            LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
            this.kafaConfigs.remove(str);
            return;
        }
        Vector vector = new Vector();
        for (Map<String, String> map : list) {
            String str2 = map.get(NotificationConstants.NOTIFICATION_TYPE);
            if (str2 == null) {
                LOG.error("invalid notificationType for this notification, ignoring and continue " + map);
            } else if (str2.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
                vector.add(map);
            }
        }
        if (vector.size() != 0) {
            this.kafaConfigs.put(str, vector);
        }
    }

    @Override // org.apache.eagle.notification.plugin.NotificationPlugin
    public void onAlert(AlertAPIEntity alertAPIEntity) {
        for (Map<String, String> map : this.kafaConfigs.get(alertAPIEntity.getTags().get(Constants.POLICY_ID))) {
            NotificationStatus notificationStatus = new NotificationStatus();
            try {
                KafkaProducerSingleton.INSTANCE.getProducer(map).send(createRecord(alertAPIEntity, map.get(NotificationConstants.TOPIC)));
                notificationStatus.successful = true;
                notificationStatus.errorMessage = "";
            } catch (Exception e) {
                LOG.error("fail writing alert to Kafka bus", e);
                notificationStatus.successful = false;
                notificationStatus.errorMessage = e.getMessage();
            }
            this.statusList.add(notificationStatus);
        }
    }

    private ProducerRecord createRecord(AlertAPIEntity alertAPIEntity, String str) throws Exception {
        return new ProducerRecord(str, NotificationPluginUtils.objectToStr(alertAPIEntity));
    }

    @Override // org.apache.eagle.notification.plugin.NotificationPlugin
    public List<NotificationStatus> getStatusList() {
        return this.statusList;
    }

    public int hashCode() {
        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
    }

    public boolean equals(Object obj) {
        return obj == this || (obj instanceof AlertKafkaPlugin);
    }
}
