package org.apache.falcon.messaging;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.messaging.util.MessagingUtil;
import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/falcon/messaging/JMSMessageProducer.class */
public class JMSMessageProducer {
    private static final Logger LOG = LoggerFactory.getLogger(JMSMessageProducer.class);
    public static final String FALCON_TOPIC_PREFIX = "FALCON.";
    public static final String ENTITY_TOPIC_NAME = "ENTITY.TOPIC";
    private static final long DEFAULT_TTL = 259200000;
    private final WorkflowExecutionContext context;
    private final MessageType messageType;

    /* loaded from: input_file:org/apache/falcon/messaging/JMSMessageProducer$MessageBuilder.class */
    public static final class MessageBuilder {
        private final WorkflowExecutionContext context;
        private MessageType type;

        private MessageBuilder(WorkflowExecutionContext workflowExecutionContext) {
            this.context = workflowExecutionContext;
        }

        public MessageBuilder type(MessageType messageType) {
            this.type = messageType;
            return this;
        }

        public JMSMessageProducer build() {
            if (this.type == null) {
                throw new IllegalArgumentException("Message messageType needs to be set.");
            }
            return new JMSMessageProducer(this.context, this.type);
        }
    }

    /* loaded from: input_file:org/apache/falcon/messaging/JMSMessageProducer$MessageType.class */
    public enum MessageType {
        FALCON,
        USER
    }

    protected JMSMessageProducer(WorkflowExecutionContext workflowExecutionContext, MessageType messageType) {
        this.context = workflowExecutionContext;
        this.messageType = messageType;
    }

    public String getTopicName() {
        String value = this.context.getValue(WorkflowExecutionArgs.TOPIC_NAME);
        if (value != null) {
            return value;
        }
        return FALCON_TOPIC_PREFIX + (this.messageType == MessageType.FALCON ? ENTITY_TOPIC_NAME : this.context.getEntityName());
    }

    public String getBrokerImplClass() {
        return this.messageType == MessageType.FALCON ? this.context.getValue(WorkflowExecutionArgs.BRKR_IMPL_CLASS) : this.context.getValue(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS);
    }

    public String getBrokerUrl() {
        return this.messageType == MessageType.FALCON ? this.context.getValue(WorkflowExecutionArgs.BRKR_URL) : this.context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
    }

    private long getBrokerTTL() {
        long j = 259200000;
        try {
            j = Long.parseLong(this.context.getValue(WorkflowExecutionArgs.BRKR_TTL)) * 60 * 1000;
        } catch (NumberFormatException e) {
            LOG.error("Error in parsing broker.ttl, setting TTL to: {} milli-seconds", Long.valueOf(DEFAULT_TTL));
        }
        return j;
    }

    public static MessageBuilder builder(WorkflowExecutionContext workflowExecutionContext) {
        return new MessageBuilder(workflowExecutionContext);
    }

    public int sendMessage() throws Exception {
        return sendMessage(WorkflowExecutionArgs.values());
    }

    public int sendMessage(WorkflowExecutionArgs[] workflowExecutionArgsArr) throws Exception {
        List<Map<String, String>> buildMessageList = buildMessageList(workflowExecutionArgsArr);
        if (buildMessageList.isEmpty()) {
            LOG.warn("No operation on output feed");
            return 0;
        }
        Connection connection = null;
        try {
            connection = createAndStartConnection(getBrokerImplClass(), "", "", getBrokerUrl());
            for (Map<String, String> map : buildMessageList) {
                LOG.info("Sending message: {}", map);
                sendMessage(connection, map);
            }
            MessagingUtil.closeQuietly(connection);
            return 0;
        } catch (Throwable th) {
            MessagingUtil.closeQuietly(connection);
            throw th;
        }
    }

    private List<Map<String, String>> buildMessageList(WorkflowExecutionArgs[] workflowExecutionArgsArr) {
        String[] outputFeedNamesList = this.context.getOutputFeedNamesList();
        if (outputFeedNamesList == null) {
            return Collections.emptyList();
        }
        try {
            String[] feedPaths = getFeedPaths();
            ArrayList arrayList = new ArrayList(feedPaths.length);
            for (int i = 0; i < feedPaths.length; i++) {
                Map<String, String> buildMessage = buildMessage(workflowExecutionArgsArr);
                if (this.context.getEntityType().equalsIgnoreCase("PROCESS")) {
                    change(buildMessage, WorkflowExecutionArgs.OUTPUT_FEED_NAMES, outputFeedNamesList[i]);
                } else {
                    change(buildMessage, WorkflowExecutionArgs.OUTPUT_FEED_NAMES, buildMessage.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()));
                }
                change(buildMessage, WorkflowExecutionArgs.OUTPUT_FEED_PATHS, feedPaths[i]);
                arrayList.add(buildMessage);
            }
            return arrayList;
        } catch (Exception e) {
            LOG.error("Error getting instance paths: ", e);
            throw new RuntimeException(e);
        }
    }

    private void sendMessage(Connection connection, Map<String, String> map) throws JMSException {
        Session session = null;
        MessageProducer messageProducer = null;
        try {
            session = connection.createSession(false, 1);
            messageProducer = session.createProducer(session.createTopic(getTopicName()));
            messageProducer.setDeliveryMode(2);
            messageProducer.setTimeToLive(getBrokerTTL());
            messageProducer.send(createMessage(session, map));
            if (messageProducer != null) {
                messageProducer.close();
            }
            if (session != null) {
                session.close();
            }
        } catch (Throwable th) {
            if (messageProducer != null) {
                messageProducer.close();
            }
            if (session != null) {
                session.close();
            }
            throw th;
        }
    }

    public Message createMessage(Session session, Map<String, String> map) throws JMSException {
        MapMessage createMapMessage = session.createMapMessage();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            createMapMessage.setString(entry.getKey(), entry.getValue());
        }
        return createMapMessage;
    }

    public void change(Map<String, String> map, WorkflowExecutionArgs workflowExecutionArgs, String str) {
        map.remove(workflowExecutionArgs.getName());
        map.put(workflowExecutionArgs.getName(), str);
    }

    private String[] getFeedPaths() throws Exception {
        WorkflowExecutionContext.EntityOperations operation = this.context.getOperation();
        if (operation == WorkflowExecutionContext.EntityOperations.GENERATE || operation == WorkflowExecutionContext.EntityOperations.REPLICATE || operation == WorkflowExecutionContext.EntityOperations.IMPORT) {
            LOG.debug("Returning instance paths: " + this.context.getOutputFeedInstancePaths());
            return this.context.getOutputFeedInstancePathsList();
        }
        Path path = new Path(this.context.getLogFile());
        FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
        return !createProxiedFileSystem.exists(path) ? new String[0] : EvictedInstanceSerDe.deserializeEvictedInstancePaths(createProxiedFileSystem, path);
    }

    private Map<String, String> buildMessage(WorkflowExecutionArgs[] workflowExecutionArgsArr) {
        HashMap hashMap = new HashMap(workflowExecutionArgsArr.length);
        for (WorkflowExecutionArgs workflowExecutionArgs : workflowExecutionArgsArr) {
            hashMap.put(workflowExecutionArgs.getName(), this.context.getValue(workflowExecutionArgs));
        }
        hashMap.remove(WorkflowExecutionArgs.LOG_FILE.getName());
        return hashMap;
    }

    private Connection createAndStartConnection(String str, String str2, String str3, String str4) throws JMSException, ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
        Connection createConnection = ((ConnectionFactory) JMSMessageProducer.class.getClassLoader().loadClass(str).getConstructor(String.class, String.class, String.class).newInstance(str2, str3, str4)).createConnection();
        createConnection.start();
        return createConnection;
    }
}
