package co.cask.cdap.template.etl.realtime.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.api.templates.plugins.PluginProperties;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.realtime.jms.JmsProvider;
import co.cask.cdap.template.etl.realtime.jms.JndiBasedJmsProvider;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import javax.annotation.Nullable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("JMS")
@Description("JMS Realtime Source - Emits a record with a field 'message' of string type")
@Plugin(type = "source")
/* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/JmsSource.class */
public class JmsSource extends RealtimeSource<StructuredRecord> {
    public static final String JMS_DESTINATION_NAME = "jms.destination.name";
    public static final String JMS_MESSAGES_TO_RECEIVE = "jms.messages.receive";
    public static final String JMS_NAMING_FACTORY_INITIAL = "jms.factory.initial";
    public static final String JMS_PROVIDER_URL = "jms.provider.url";
    public static final String JMS_CONNECTION_FACTORY_NAME = "jms.jndi.connectionfactory.name";
    public static final String JMS_PLUGIN_NAME = "jms.plugin.name";
    public static final String JMS_PLUGIN_TYPE = "jms.plugin.type";
    public static final String JMS_CUSTOM_PROPERTIES = "jms.plugin.custom.properties";
    public static final String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory";
    public static final String JMS_PROVIDER = "JMSProvider";
    private static final long JMS_CONSUMER_TIMEOUT_MS = 2000;
    public static final String MESSAGE = "message";
    private final JmsPluginConfig config;
    private int jmsAcknowledgeMode = 1;
    private JmsProvider jmsProvider;
    private transient Connection connection;
    private transient Session session;
    private transient MessageConsumer consumer;
    private int messagesToReceive;
    private static final Logger LOG = LoggerFactory.getLogger(JmsSource.class);
    private static final Gson GSON = new Gson();
    private static final Type STRING_MAP_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.template.etl.realtime.source.JmsSource.1
    }.getType();
    private static final Schema SCHEMA = Schema.recordOf("JMS Message", new Schema.Field[]{Schema.Field.of("message", Schema.of(Schema.Type.STRING))});

    /* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/JmsSource$JmsPluginConfig.class */
    public static class JmsPluginConfig extends PluginConfig {

        @Name(JmsSource.JMS_DESTINATION_NAME)
        @Description("Name of the destination to get messages")
        private String destinationName;

        @Name(JmsSource.JMS_MESSAGES_TO_RECEIVE)
        @Description("Max number messages should be retrieved per poll. The default value is 50.")
        @Nullable
        private Integer messagesToReceive;

        @Name(JmsSource.JMS_NAMING_FACTORY_INITIAL)
        @Description("The fully qualified class name of the factory class that will create an initial context. This will be passed to JNDI initial context as java.naming.factory.initial")
        private String initialContextFactory;

        @Name(JmsSource.JMS_PROVIDER_URL)
        @Description("This property contains information for the service provider URL to use. This will be passed to JNDI initial context as java.naming.provider.url")
        private String providerUrl;

        @Name(JmsSource.JMS_CONNECTION_FACTORY_NAME)
        @Description("The name of the connection factory from the JNDI. The default will be ConnectionFactory.")
        @Nullable
        private String connectionFactoryName;

        @Name(JmsSource.JMS_PLUGIN_NAME)
        @Description("Name of the JMS plugin to use. This is the value of the 'name' key defined in the json file for the JMS plugin. Defaults to java.naming.factory.initial")
        @Nullable
        public String jmsPluginName;

        @Name(JmsSource.JMS_PLUGIN_TYPE)
        @Description("Type of the JMS plugin to use. This is the value of the 'type' key defined in the json file for the JMS plugin. Defaults to 'JMSProvider'.")
        @Nullable
        public String jmsPluginType;

        @Name(JmsSource.JMS_CUSTOM_PROPERTIES)
        @Description("Provide any custom properties as a JSON Map")
        @Nullable
        public String customProperties;

        public JmsPluginConfig() {
            this(null, null, null, 50, JmsSource.DEFAULT_CONNECTION_FACTORY, "java.naming.factory.initial", JmsSource.JMS_PROVIDER, null);
        }

        public JmsPluginConfig(String str, String str2, String str3, @Nullable Integer num, @Nullable String str4, @Nullable String str5, @Nullable String str6, @Nullable String str7) {
            this.destinationName = str;
            if (num != null) {
                this.messagesToReceive = num;
            } else {
                this.messagesToReceive = 50;
            }
            this.initialContextFactory = str2;
            this.providerUrl = str3;
            if (str4 != null) {
                this.connectionFactoryName = str4;
            } else {
                this.connectionFactoryName = JmsSource.JMS_CONNECTION_FACTORY_NAME;
            }
            this.jmsPluginName = str5;
            if (this.jmsPluginName == null) {
                this.jmsPluginName = "java.naming.factory.initial";
            }
            this.jmsPluginType = str6;
            if (this.jmsPluginType == null) {
                this.jmsPluginType = JmsSource.JMS_PROVIDER;
            }
            this.customProperties = str7;
        }
    }

    public JmsSource(JmsPluginConfig jmsPluginConfig) {
        this.config = jmsPluginConfig;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        HashMap newHashMap = Maps.newHashMap();
        if (this.config.getProperties() != null) {
            newHashMap.putAll(this.config.getProperties().getProperties());
        }
        if (this.config.customProperties != null) {
            newHashMap.putAll((Map) GSON.fromJson(this.config.customProperties, STRING_MAP_TYPE));
        }
        this.messagesToReceive = this.config.messagesToReceive.intValue();
        Hashtable hashtable = new Hashtable();
        for (Map.Entry entry : newHashMap.entrySet()) {
            hashtable.put(entry.getKey(), entry.getValue());
        }
        hashtable.put("java.naming.factory.initial", this.config.initialContextFactory);
        hashtable.put("java.naming.provider.url", this.config.providerUrl);
        Class loadPluginClass = realtimeContext.loadPluginClass(getPluginId());
        initializeJMSConnection(hashtable, this.config.destinationName, this.config.connectionFactoryName, loadPluginClass != null ? loadPluginClass.getClassLoader() : null);
    }

    private void initializeJMSConnection(Hashtable<String, String> hashtable, String str, String str2, ClassLoader classLoader) {
        if (this.jmsProvider == null) {
            LOG.trace("JMS provider is not set when trying to initialize JMS connection.");
            if (str == null) {
                throw new IllegalStateException("Could not have null JMSProvider for JMS Source. Please set the right JMSProvider");
            }
            LOG.trace("Using JNDI default JMS provider for destination: {}", str);
            if (classLoader != null) {
                Thread.currentThread().setContextClassLoader(classLoader);
            }
            this.jmsProvider = new JndiBasedJmsProvider(hashtable, str, str2);
        }
        try {
            this.connection = this.jmsProvider.getConnectionFactory().createConnection();
            this.session = this.connection.createSession(false, this.jmsAcknowledgeMode);
            this.consumer = this.session.createConsumer(this.jmsProvider.getDestination());
            this.connection.start();
        } catch (JMSException e) {
            if (this.session != null) {
                try {
                    this.session.close();
                } catch (JMSException e2) {
                    LOG.warn("Exception when closing session", e2);
                }
            }
            if (this.connection != null) {
                try {
                    this.connection.close();
                } catch (JMSException e3) {
                    LOG.warn("Exception when closing connection", e3);
                }
            }
            throw new RuntimeException("JMSException thrown when trying to initialize connection", e);
        }
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        Preconditions.checkArgument(pipelineConfigurer.usePluginClass(this.config.jmsPluginType, this.config.jmsPluginName, getPluginId(), PluginProperties.builder().build()) != null, "JMS Initial Connection Factory Context class must be found.");
    }

    private String getPluginId() {
        return String.format("%s.%s.%s", "jmsource", this.config.jmsPluginType, this.config.jmsPluginName);
    }

    @Nullable
    public SourceState poll(Emitter<StructuredRecord> emitter, SourceState sourceState) {
        String obj;
        Message message = null;
        int i = 0;
        do {
            try {
                message = this.consumer.receive(JMS_CONSUMER_TIMEOUT_MS);
            } catch (JMSException e) {
                LOG.warn("Exception when trying to receive message from JMS consumer.");
            }
            if (message != null) {
                try {
                    if (message instanceof TextMessage) {
                        obj = ((TextMessage) message).getText();
                        LOG.trace("Process JMS TextMessage : ", obj);
                    } else if (message instanceof BytesMessage) {
                        obj = ((BytesMessage) message).readUTF();
                        LOG.trace("Processing JMS ByteMessage : {}", obj);
                    } else {
                        obj = message.toString();
                        LOG.trace("Processing JMS message : ", obj);
                    }
                    emitter.emit(stringMessageToStructuredRecord(obj));
                    i++;
                } catch (JMSException e2) {
                    LOG.error("Unable to read text from a JMS Message.");
                }
            }
            if (message == null) {
                break;
            }
        } while (i < this.messagesToReceive);
        return new SourceState(sourceState.getState());
    }

    private static StructuredRecord stringMessageToStructuredRecord(String str) {
        StructuredRecord.Builder builder = StructuredRecord.builder(SCHEMA);
        builder.set("message", str);
        return builder.build();
    }

    public void destroy() {
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (Exception e) {
            throw new RuntimeException("Exception on closing JMS connection: " + e.getMessage(), e);
        }
    }

    public void setSessionAcknowledgeMode(int i) {
        switch (i) {
            case 0:
            case 1:
            case 2:
            case 3:
                this.jmsAcknowledgeMode = i;
                return;
            default:
                throw new IllegalArgumentException("Unknown JMS Session acknowledge mode: " + i);
        }
    }

    public void setJmsProvider(JmsProvider jmsProvider) {
        this.jmsProvider = jmsProvider;
    }

    public JmsProvider getJmsProvider() {
        return this.jmsProvider;
    }
}
