package org.apache.sentry.api.service.thrift;

import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.sentry.core.common.utils.PubSub;
import org.apache.sentry.service.thrift.SentryServiceIntegrationBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/sentry/api/service/thrift/TestSentryServerPubSub.class */
public class TestSentryServerPubSub extends SentryServiceIntegrationBase {
    private static final PubSub.Topic[] topics = PubSub.Topic.values();
    private static final String[] messages = {"message1", "message2", "message3", ""};
    private static volatile String REQUEST_URL;
    private final TestSubscriber testSubscriber = new TestSubscriber();

    /* loaded from: input_file:org/apache/sentry/api/service/thrift/TestSentryServerPubSub$TestSubscriber.class */
    private static final class TestSubscriber implements PubSub.Subscriber {
        private volatile PubSub.Topic topic;
        private volatile String message;
        private volatile int count;

        private TestSubscriber() {
        }

        public void onMessage(PubSub.Topic topic, String str) {
            this.topic = topic;
            this.message = str;
            this.count++;
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        webServerEnabled = true;
        webSecurity = false;
        SentryServiceIntegrationBase.setup();
        REQUEST_URL = "http://" + SERVER_HOST + ":" + webServerPort + "/admin/publishMessage?topic=%s&message=%s";
    }

    @Override // org.apache.sentry.service.thrift.SentryServiceIntegrationBase
    @Before
    public void before() throws Exception {
        this.testSubscriber.count = 0;
        for (PubSub.Topic topic : topics) {
            PubSub.getInstance().subscribe(topic, this.testSubscriber);
        }
        Assert.assertEquals("Unexpected number of registered topics", topics.length, PubSub.getInstance().getTopics().size());
    }

    @Override // org.apache.sentry.service.thrift.SentryServiceIntegrationBase
    @After
    public void after() {
        for (PubSub.Topic topic : topics) {
            PubSub.getInstance().unsubscribe(topic, this.testSubscriber);
        }
        this.testSubscriber.count = 0;
        Assert.assertTrue("Topics should have been removed after unsubscribe()", PubSub.getInstance().getTopics().isEmpty());
    }

    @Test
    public void testPubSub() throws Exception {
        int i = 0;
        for (PubSub.Topic topic : topics) {
            for (String str : messages) {
                HttpURLConnection httpURLConnection = null;
                try {
                    httpURLConnection = (HttpURLConnection) new URL(String.format(REQUEST_URL, topic.getName(), str)).openConnection();
                    Assert.assertEquals("Unexpected response code", 200L, httpURLConnection.getResponseCode());
                    safeClose(httpURLConnection);
                    Assert.assertEquals("Unexpected topic", topic, this.testSubscriber.topic);
                    if (str.isEmpty()) {
                        Assert.assertEquals("Unexpected message", (Object) null, this.testSubscriber.message);
                    } else {
                        Assert.assertEquals("Unexpected message", str, this.testSubscriber.message);
                    }
                    i++;
                    Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", i, this.testSubscriber.count);
                } catch (Throwable th) {
                    safeClose(httpURLConnection);
                    throw th;
                }
            }
        }
    }

    @Test
    public void testPubSubEmptyTopic() throws Exception {
        HttpURLConnection httpURLConnection = null;
        try {
            httpURLConnection = (HttpURLConnection) new URL(String.format(REQUEST_URL, "", "message")).openConnection();
            Assert.assertEquals("Unexpected response code", 200L, httpURLConnection.getResponseCode());
            safeClose(httpURLConnection);
            Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0L, this.testSubscriber.count);
        } catch (Throwable th) {
            safeClose(httpURLConnection);
            throw th;
        }
    }

    @Test
    public void testPubSubInvalidTopic() throws Exception {
        for (String str : new String[]{"invalid_topic_1", "invalid_topic_2", "invalid_topic_3"}) {
            HttpURLConnection httpURLConnection = null;
            try {
                httpURLConnection = (HttpURLConnection) new URL(String.format(REQUEST_URL, str, "message")).openConnection();
                Assert.assertEquals("Unexpected response code", 400L, httpURLConnection.getResponseCode());
                safeClose(httpURLConnection);
                Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0L, this.testSubscriber.count);
            } catch (Throwable th) {
                safeClose(httpURLConnection);
                throw th;
            }
        }
    }

    @Test
    public void testPubSubNonSubscribedTopic() throws Exception {
        PubSub.getInstance().unsubscribe(PubSub.Topic.HDFS_SYNC_HMS, this.testSubscriber);
        Assert.assertEquals("Unexpected number of registered topics", topics.length - 1, PubSub.getInstance().getTopics().size());
        HttpURLConnection httpURLConnection = null;
        try {
            httpURLConnection = (HttpURLConnection) new URL(String.format(REQUEST_URL, PubSub.Topic.HDFS_SYNC_HMS.getName(), "message")).openConnection();
            Assert.assertEquals("Unexpected response code", 400L, httpURLConnection.getResponseCode());
            safeClose(httpURLConnection);
            PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this.testSubscriber);
        } catch (Throwable th) {
            safeClose(httpURLConnection);
            throw th;
        }
    }

    private static void safeClose(HttpURLConnection httpURLConnection) {
        if (httpURLConnection != null) {
            try {
                httpURLConnection.disconnect();
            } catch (Exception e) {
            }
        }
    }
}
