package org.apache.oozie.service;

import java.net.URI;
import java.util.Random;
import org.apache.activemq.broker.BrokerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.DefaultConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.MessageReceiver;
import org.apache.oozie.test.XTestCase;
import org.junit.Test;

/* loaded from: input_file:org/apache/oozie/service/TestJMSAccessorService.class */
public class TestJMSAccessorService extends XTestCase {
    private Services services;
    private static Random random = new Random();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.services = super.setupServicesForHCatalog();
        this.services.init();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XTestCase
    public void tearDown() throws Exception {
        this.services.destroy();
        super.tearDown();
    }

    @Test
    public void testConnection() throws Exception {
        HCatAccessorService hCatAccessorService = this.services.get(HCatAccessorService.class);
        JMSAccessorService jMSAccessorService = this.services.get(JMSAccessorService.class);
        ConnectionContext createConnectionContext = jMSAccessorService.createConnectionContext(hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020")));
        assertTrue(createConnectionContext.isConnectionInitialized());
        ConnectionContext createConnectionContext2 = jMSAccessorService.createConnectionContext(hCatAccessorService.getJMSConnectionInfo(new URI("http://unknown:80")));
        assertTrue(createConnectionContext2.isConnectionInitialized());
        assertEquals(createConnectionContext, createConnectionContext2);
        createConnectionContext.close();
    }

    @Test
    public void testRegisterSingleConsumerPerTopic() {
        try {
            HCatAccessorService hCatAccessorService = this.services.get(HCatAccessorService.class);
            JMSAccessorService jMSAccessorService = this.services.get(JMSAccessorService.class);
            JMSConnectionInfo jMSConnectionInfo = hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
            jMSAccessorService.registerForNotification(jMSConnectionInfo, "hcat.mydb.mytable", new HCatMessageHandler("hcat.server.com:5080"));
            MessageReceiver messageReceiver = jMSAccessorService.getMessageReceiver(jMSConnectionInfo, "hcat.mydb.mytable");
            jMSAccessorService.registerForNotification(jMSConnectionInfo, "hcat.mydb.mytable", new HCatMessageHandler("hcat.server.com:5080"));
            assertEquals(messageReceiver, jMSAccessorService.getMessageReceiver(jMSConnectionInfo, "hcat.mydb.mytable"));
        } catch (Exception e) {
            e.printStackTrace();
            fail("Exception encountered : " + e);
        }
    }

    @Test
    public void testUnRegisterTopic() {
        try {
            HCatAccessorService hCatAccessorService = this.services.get(HCatAccessorService.class);
            JMSAccessorService jMSAccessorService = this.services.get(JMSAccessorService.class);
            JMSConnectionInfo jMSConnectionInfo = hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
            jMSAccessorService.registerForNotification(jMSConnectionInfo, "hcatalog.mydb.mytable", new HCatMessageHandler("hcat.server.com:5080"));
            assertNotNull(jMSAccessorService.getMessageReceiver(jMSConnectionInfo, "hcatalog.mydb.mytable"));
            jMSAccessorService.unregisterFromNotification(jMSConnectionInfo, "hcatalog.mydb.mytable");
            assertEquals(null, jMSAccessorService.getMessageReceiver(jMSConnectionInfo, "hcatalog.mydb.mytable"));
        } catch (Exception e) {
            e.printStackTrace();
            fail("Exception encountered : " + e);
        }
    }

    @Test
    public void testConnectionContext() throws ServiceException {
        try {
            this.services.destroy();
            this.services = super.setupServicesForHCatalog();
            this.services.getConf().set("oozie.service.HCatAccessorService.jmsconnections", "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.${1}");
            this.services.init();
            JMSConnectionInfo jMSConnectionInfo = this.services.get(HCatAccessorService.class).getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
            assertEquals("java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver", jMSConnectionInfo.getJNDIPropertiesString());
            new DefaultConnectionContext().createConnection(jMSConnectionInfo.getJNDIProperties());
            BrokerService brokerService = new BrokerService();
            brokerService.setDataDirectory(getTestCaseDir());
            brokerService.stop();
        } catch (Exception e) {
            e.printStackTrace();
            fail("Unexpected exception " + e);
        }
    }

    @Test
    public void testConnectionRetry() throws Exception {
        this.services.destroy();
        this.services = super.setupServicesForHCatalog();
        String str = "tcp://localhost:" + (30000 + random.nextInt(10000));
        Configuration conf = this.services.getConf();
        conf.set("oozie.service.JMSAccessorService.retry.initial.delay", "1");
        conf.set("oozie.service.JMSAccessorService.retry.max.attempts", "3");
        conf.set("oozie.service.HCatAccessorService.jmsconnections", "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" + str + ";connectionFactoryNames#ConnectionFactory");
        this.services.init();
        HCatAccessorService hCatAccessorService = Services.get().get(HCatAccessorService.class);
        JMSAccessorService jMSAccessorService = Services.get().get(JMSAccessorService.class);
        JMSConnectionInfo jMSConnectionInfo = hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
        jMSAccessorService.registerForNotification(jMSConnectionInfo, "topic.topic1", new HCatMessageHandler("hcat.server.com:5080"));
        assertFalse(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        assertTrue(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertTrue(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        BrokerService brokerService = new BrokerService();
        brokerService.addConnector(str);
        brokerService.setDataDirectory(getTestCaseDir());
        brokerService.start();
        Thread.sleep(1000L);
        assertTrue(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        assertFalse(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertFalse(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        brokerService.stop();
        jMSAccessorService.destroy();
    }

    @Test
    public void testConnectionRetryExceptionListener() throws Exception {
        this.services.destroy();
        this.services = super.setupServicesForHCatalog();
        String str = "tcp://localhost:" + (30000 + random.nextInt(10000));
        String str2 = "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" + str + ";connectionFactoryNames#ConnectionFactory";
        Configuration conf = this.services.getConf();
        conf.set("oozie.service.JMSAccessorService.retry.initial.delay", "1");
        conf.set("oozie.service.JMSAccessorService.retry.max.attempts", "3");
        conf.set("oozie.service.HCatAccessorService.jmsconnections", "default=" + str2);
        this.services.init();
        HCatAccessorService hCatAccessorService = Services.get().get(HCatAccessorService.class);
        JMSAccessorService jMSAccessorService = Services.get().get(JMSAccessorService.class);
        BrokerService brokerService = new BrokerService();
        brokerService.addConnector(str);
        brokerService.setDataDirectory(getTestCaseDir());
        brokerService.start();
        JMSConnectionInfo jMSConnectionInfo = hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
        jMSAccessorService.registerForNotification(jMSConnectionInfo, "topic.topic1", new HCatMessageHandler("hcat.server.com:5080"));
        assertTrue(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        assertFalse(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertFalse(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        ConnectionContext createConnectionContext = jMSAccessorService.createConnectionContext(jMSConnectionInfo);
        brokerService.stop();
        try {
            createConnectionContext.createSession(1);
            fail("Exception expected");
        } catch (Exception e) {
            Thread.sleep(100L);
            assertFalse(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
            assertTrue(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
            assertTrue(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        }
        BrokerService brokerService2 = new BrokerService();
        brokerService2.addConnector(str);
        brokerService2.setDataDirectory(getTestCaseDir());
        brokerService2.start();
        Thread.sleep(1000L);
        assertTrue(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        assertFalse(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertFalse(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        brokerService2.stop();
        jMSAccessorService.destroy();
    }

    @Test
    public void testConnectionRetryMaxAttempt() throws Exception {
        this.services.destroy();
        this.services = super.setupServicesForHCatalog();
        Configuration conf = this.services.getConf();
        conf.set("oozie.service.JMSAccessorService.retry.initial.delay", "1");
        conf.set("oozie.service.JMSAccessorService.retry.max.attempts", "1");
        conf.set("oozie.service.HCatAccessorService.jmsconnections", "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:12345;connectionFactoryNames#ConnectionFactory");
        this.services.init();
        HCatAccessorService hCatAccessorService = Services.get().get(HCatAccessorService.class);
        JMSAccessorService jMSAccessorService = Services.get().get(JMSAccessorService.class);
        JMSConnectionInfo jMSConnectionInfo = hCatAccessorService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
        jMSAccessorService.registerForNotification(jMSConnectionInfo, "topic.topic1", new HCatMessageHandler("hcat.server.com:5080"));
        assertTrue(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertTrue(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        assertFalse(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        Thread.sleep(1100L);
        assertTrue(jMSAccessorService.isConnectionInRetryList(jMSConnectionInfo));
        assertTrue(jMSAccessorService.isTopicInRetryList(jMSConnectionInfo, "topic.topic1"));
        assertFalse(jMSAccessorService.isListeningToTopic(jMSConnectionInfo, "topic.topic1"));
        assertEquals(1, jMSAccessorService.getNumConnectionAttempts(jMSConnectionInfo));
        assertFalse(jMSAccessorService.retryConnection(jMSConnectionInfo));
        jMSAccessorService.destroy();
    }
}
