package co.cask.cdap.messaging.server;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.metrics.NoOpMetricsCollectionService;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.client.ClientMessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.messaging.guice.MessagingServerRuntimeModule;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.tephra.Transaction;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/messaging/server/MessagingHttpServiceTest.class */
public class MessagingHttpServiceTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static CConfiguration cConf;
    private static MessagingHttpService httpService;
    private static MessagingService client;

    @BeforeClass
    public static void init() throws IOException {
        cConf = CConfiguration.create();
        cConf.set("local.data.dir", TEMP_FOLDER.newFolder().getAbsolutePath());
        cConf.setInt("messaging.http.server.consume.chunk.size", 128);
        cConf.setLong("data.tx.max.lifetime", 10000000000L);
        Injector createInjector = Guice.createInjector(new Module[]{new ConfigModule(cConf), new DiscoveryRuntimeModule().getInMemoryModules(), new MessagingServerRuntimeModule().getInMemoryModules(), new AbstractModule() { // from class: co.cask.cdap.messaging.server.MessagingHttpServiceTest.1
            protected void configure() {
                bind(MetricsCollectionService.class).toInstance(new NoOpMetricsCollectionService());
            }
        }});
        httpService = (MessagingHttpService) createInjector.getInstance(MessagingHttpService.class);
        httpService.startAndWait();
        client = new ClientMessagingService((DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class));
    }

    @AfterClass
    public static void finish() {
        httpService.stopAndWait();
    }

    @Test
    public void testTxMaxLifeTime() throws Exception {
        TopicId topicId = new NamespaceId("txCheck").topic("t1");
        client.createTopic(new TopicMetadata(topicId, new Object[0]));
        RollbackDetail publish = client.publish(StoreRequestBuilder.of(topicId).setTransaction(1L).addPayloads(new String[]{"a", "b"}).build());
        try {
            client.publish(StoreRequestBuilder.of(topicId).setTransaction(-9223372036854775807L).addPayloads(new String[]{"c", "d"}).build());
            Assert.fail("Expected IOException");
        } catch (IOException e) {
        }
        HashSet hashSet = new HashSet();
        CloseableIterator fetch = client.prepareFetch(topicId).fetch();
        while (fetch.hasNext()) {
            hashSet.add(Bytes.toString(((RawMessage) fetch.next()).getPayload()));
        }
        Assert.assertEquals(2L, hashSet.size());
        Assert.assertTrue(hashSet.contains("a"));
        Assert.assertTrue(hashSet.contains("b"));
        fetch.close();
        client.rollback(topicId, publish);
        client.deleteTopic(topicId);
    }

    @Test
    public void testMetadataEndpoints() throws Exception {
        NamespaceId namespaceId = new NamespaceId("metadata");
        TopicId topicId = namespaceId.topic("t1");
        TopicId topicId2 = namespaceId.topic("t2");
        try {
            client.getTopic(topicId);
            Assert.fail("Expected TopicNotFoundException");
        } catch (TopicNotFoundException e) {
        }
        client.createTopic(new TopicMetadata(topicId, new Object[0]));
        try {
            client.createTopic(new TopicMetadata(topicId, new Object[0]));
            Assert.fail("Expect TopicAlreadyExistsException");
        } catch (TopicAlreadyExistsException e2) {
        }
        Assert.assertEquals(cConf.getInt("messaging.topic.default.ttl.seconds"), client.getTopic(topicId).getTTL());
        client.updateTopic(new TopicMetadata(topicId, new Object[]{"ttl", "5"}));
        Assert.assertEquals(5L, client.getTopic(topicId).getTTL());
        try {
            client.createTopic(new TopicMetadata(topicId2, new Object[]{"ttl", "xyz"}));
            Assert.fail("Expect BadRequestException");
        } catch (IllegalArgumentException e3) {
        }
        client.createTopic(new TopicMetadata(topicId2, new Object[]{"ttl", "5"}));
        Assert.assertEquals(5L, client.getTopic(topicId2).getTTL());
        Assert.assertEquals(Arrays.asList(topicId, topicId2), client.listTopics(namespaceId));
        client.deleteTopic(topicId);
        client.deleteTopic(topicId2);
        try {
            client.deleteTopic(topicId);
            Assert.fail("Expect TopicNotFoundException");
        } catch (TopicNotFoundException e4) {
        }
        try {
            client.updateTopic(new TopicMetadata(topicId, new Object[0]));
            Assert.fail("Expect TopicNotFoundException");
        } catch (TopicNotFoundException e5) {
        }
        Assert.assertTrue(client.listTopics(namespaceId).isEmpty());
    }

    @Test
    public void testGeMetadata() throws Exception {
        TopicId topicId = new NamespaceId("ns2").topic("d");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{"ttl", "100"});
        for (int i = 1; i <= 5; i++) {
            client.createTopic(topicMetadata);
            Assert.assertEquals(100L, client.getTopic(topicId).getTTL());
            Assert.assertEquals(i, r0.getGeneration());
            client.deleteTopic(topicId);
        }
    }

    @Test
    public void testDeletes() throws Exception {
        TopicId topicId = new NamespaceId("ns1").topic("del");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{"ttl", "100"});
        for (int i = 0; i < 10; i++) {
            client.createTopic(topicMetadata);
            String format = String.format("m%d", Integer.valueOf(i));
            String format2 = String.format("m%d", Integer.valueOf(i + 1));
            Assert.assertNull(client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{format, format2}).build()));
            ArrayList arrayList = new ArrayList();
            CloseableIterator fetch = client.prepareFetch(topicId).fetch();
            Throwable th = null;
            try {
                try {
                    Iterators.addAll(arrayList, fetch);
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    Assert.assertEquals(2L, arrayList.size());
                    HashSet hashSet = new HashSet();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        hashSet.add(Bytes.toString(((RawMessage) it.next()).getPayload()));
                    }
                    Assert.assertTrue(hashSet.contains(format));
                    Assert.assertTrue(hashSet.contains(format2));
                    client.deleteTopic(topicId);
                } finally {
                }
            } catch (Throwable th3) {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fetch.close();
                    }
                }
                throw th3;
            }
        }
    }

    @Test
    public void testBasicPubSub() throws Exception {
        Throwable th;
        ArrayList arrayList;
        byte[] id;
        CloseableIterator fetch;
        Throwable th2;
        TopicId topicId = new NamespaceId("ns1").topic("testBasicPubSub");
        try {
            client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{"a"}).build());
            Assert.fail("Expected TopicNotFoundException");
        } catch (TopicNotFoundException e) {
        }
        try {
            client.prepareFetch(topicId).fetch();
            Assert.fail("Expected TopicNotFoundException");
        } catch (TopicNotFoundException e2) {
        }
        client.createTopic(new TopicMetadata(topicId, new Object[0]));
        try {
            client.publish(StoreRequestBuilder.of(topicId).build());
            Assert.fail("Expected IllegalArgumentException");
        } catch (IllegalArgumentException e3) {
        }
        Assert.assertNull(client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{"m0", "m1"}).build()));
        RollbackDetail publish = client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{"m2"}).setTransaction(1L).build());
        Assert.assertNotNull(publish);
        client.rollback(topicId, publish);
        ArrayList arrayList2 = new ArrayList();
        CloseableIterator fetch2 = client.prepareFetch(topicId).fetch();
        Throwable th3 = null;
        try {
            try {
                Iterators.addAll(arrayList2, fetch2);
                if (fetch2 != null) {
                    if (0 != 0) {
                        try {
                            fetch2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                Assert.assertEquals(3L, arrayList2.size());
                for (int i = 0; i < 3; i++) {
                    Assert.assertEquals("m" + i, Bytes.toString(((RawMessage) arrayList2.get(i)).getPayload()));
                }
                arrayList = new ArrayList();
                fetch2 = client.prepareFetch(topicId).setStartTime(0L).setTransaction(new Transaction(3L, 3L, new long[0], new long[]{2}, 2L)).fetch();
                th = null;
            } finally {
            }
            try {
                try {
                    Iterators.addAll(arrayList, fetch2);
                    if (fetch2 != null) {
                        if (0 != 0) {
                            try {
                                fetch2.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fetch2.close();
                        }
                    }
                    Assert.assertEquals(2L, arrayList.size());
                    for (int i2 = 0; i2 < 2; i2++) {
                        Assert.assertEquals("m" + i2, Bytes.toString(((RawMessage) arrayList2.get(i2)).getPayload()));
                    }
                    CloseableIterator fetch3 = client.prepareFetch(topicId).setStartMessage(((RawMessage) arrayList2.get(1)).getId(), false).fetch();
                    Throwable th6 = null;
                    try {
                        Assert.assertTrue(fetch3.hasNext());
                        Assert.assertEquals("m2", Bytes.toString(((RawMessage) fetch3.next()).getPayload()));
                        if (fetch3 != null) {
                            if (0 != 0) {
                                try {
                                    fetch3.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                fetch3.close();
                            }
                        }
                        CloseableIterator fetch4 = client.prepareFetch(topicId).setStartMessage(((RawMessage) arrayList2.get(2)).getId(), false).fetch();
                        Throwable th8 = null;
                        try {
                            try {
                                Assert.assertFalse(fetch4.hasNext());
                                if (fetch4 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch4.close();
                                        } catch (Throwable th9) {
                                            th8.addSuppressed(th9);
                                        }
                                    } else {
                                        fetch4.close();
                                    }
                                }
                                CloseableIterator fetch5 = client.prepareFetch(topicId).setStartTime(new MessageId(((RawMessage) arrayList2.get(1)).getId()).getPublishTimestamp()).setLimit(2).fetch();
                                Throwable th10 = null;
                                try {
                                    try {
                                        arrayList2.clear();
                                        Iterators.addAll(arrayList2, fetch5);
                                        if (fetch5 != null) {
                                            if (0 != 0) {
                                                try {
                                                    fetch5.close();
                                                } catch (Throwable th11) {
                                                    th10.addSuppressed(th11);
                                                }
                                            } else {
                                                fetch5.close();
                                            }
                                        }
                                        Assert.assertEquals(2L, arrayList2.size());
                                        for (int i3 = 0; i3 < 2; i3++) {
                                            Assert.assertEquals("m" + i3, Bytes.toString(((RawMessage) arrayList2.get(i3)).getPayload()));
                                        }
                                        client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{"m3"}).setTransaction(2L).build());
                                        client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{"m4"}).build());
                                        id = ((RawMessage) arrayList2.get(1)).getId();
                                        CloseableIterator fetch6 = client.prepareFetch(topicId).setStartMessage(id, false).fetch();
                                        Throwable th12 = null;
                                        try {
                                            arrayList2.clear();
                                            Iterators.addAll(arrayList2, fetch6);
                                            if (fetch6 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        fetch6.close();
                                                    } catch (Throwable th13) {
                                                        th12.addSuppressed(th13);
                                                    }
                                                } else {
                                                    fetch6.close();
                                                }
                                            }
                                            Assert.assertEquals(3L, arrayList2.size());
                                            for (int i4 = 0; i4 < 3; i4++) {
                                                Assert.assertEquals("m" + (i4 + 2), Bytes.toString(((RawMessage) arrayList2.get(i4)).getPayload()));
                                            }
                                            CloseableIterator fetch7 = client.prepareFetch(topicId).setStartMessage(id, false).setTransaction(new Transaction(3L, 3L, new long[0], new long[]{2}, 2L)).fetch();
                                            Throwable th14 = null;
                                            try {
                                                try {
                                                    Assert.assertFalse(fetch7.hasNext());
                                                    if (fetch7 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                fetch7.close();
                                                            } catch (Throwable th15) {
                                                                th14.addSuppressed(th15);
                                                            }
                                                        } else {
                                                            fetch7.close();
                                                        }
                                                    }
                                                    fetch = client.prepareFetch(topicId).setStartMessage(id, false).setTransaction(new Transaction(3L, 3L, new long[]{2}, new long[0], 0L)).fetch();
                                                    th2 = null;
                                                } finally {
                                                }
                                            } finally {
                                                if (fetch7 != null) {
                                                    if (th14 != null) {
                                                        try {
                                                            fetch7.close();
                                                        } catch (Throwable th16) {
                                                            th14.addSuppressed(th16);
                                                        }
                                                    } else {
                                                        fetch7.close();
                                                    }
                                                }
                                            }
                                        } catch (Throwable th17) {
                                            if (fetch6 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        fetch6.close();
                                                    } catch (Throwable th18) {
                                                        th12.addSuppressed(th18);
                                                    }
                                                } else {
                                                    fetch6.close();
                                                }
                                            }
                                            throw th17;
                                        }
                                    } finally {
                                    }
                                } finally {
                                    if (fetch5 != null) {
                                        if (th10 != null) {
                                            try {
                                                fetch5.close();
                                            } catch (Throwable th19) {
                                                th10.addSuppressed(th19);
                                            }
                                        } else {
                                            fetch5.close();
                                        }
                                    }
                                }
                            } finally {
                            }
                        } finally {
                            if (fetch4 != null) {
                                if (th8 != null) {
                                    try {
                                        fetch4.close();
                                    } catch (Throwable th20) {
                                        th8.addSuppressed(th20);
                                    }
                                } else {
                                    fetch4.close();
                                }
                            }
                        }
                    } catch (Throwable th21) {
                        if (fetch3 != null) {
                            if (0 != 0) {
                                try {
                                    fetch3.close();
                                } catch (Throwable th22) {
                                    th6.addSuppressed(th22);
                                }
                            } else {
                                fetch3.close();
                            }
                        }
                        throw th21;
                    }
                } finally {
                }
                try {
                    try {
                        arrayList2.clear();
                        Iterators.addAll(arrayList2, fetch);
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th23) {
                                    th2.addSuppressed(th23);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        Assert.assertEquals(1L, arrayList2.size());
                        Assert.assertEquals("m4", Bytes.toString(((RawMessage) arrayList2.get(0)).getPayload()));
                        CloseableIterator fetch8 = client.prepareFetch(topicId).setStartMessage(id, false).setTransaction(new Transaction(3L, 3L, new long[0], new long[0], 0L)).fetch();
                        Throwable th24 = null;
                        try {
                            try {
                                arrayList2.clear();
                                Iterators.addAll(arrayList2, fetch8);
                                if (fetch8 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch8.close();
                                        } catch (Throwable th25) {
                                            th24.addSuppressed(th25);
                                        }
                                    } else {
                                        fetch8.close();
                                    }
                                }
                                Assert.assertEquals(2L, arrayList2.size());
                                for (int i5 = 0; i5 < 2; i5++) {
                                    Assert.assertEquals("m" + (i5 + 3), Bytes.toString(((RawMessage) arrayList2.get(i5)).getPayload()));
                                }
                                client.deleteTopic(topicId);
                            } finally {
                            }
                        } finally {
                            if (fetch8 != null) {
                                if (th24 != null) {
                                    try {
                                        fetch8.close();
                                    } catch (Throwable th26) {
                                        th24.addSuppressed(th26);
                                    }
                                } else {
                                    fetch8.close();
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                    if (fetch != null) {
                        if (th2 != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th27) {
                                th2.addSuppressed(th27);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testChunkConsume() throws Exception {
        TopicId topicId = new NamespaceId("ns1").topic("testChunkConsume");
        client.createTopic(new TopicMetadata(topicId, new Object[0]));
        int i = cConf.getInt("messaging.http.server.consume.chunk.size") / 2;
        for (int i2 = 0; i2 < 10; i2++) {
            client.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{Strings.repeat(Integer.toString(i2), i)}).build());
        }
        ArrayList arrayList = new ArrayList();
        CloseableIterator fetch = client.prepareFetch(topicId).fetch();
        Throwable th = null;
        try {
            try {
                Iterators.addAll(arrayList, fetch);
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                Assert.assertEquals(10L, arrayList.size());
                for (int i3 = 0; i3 < 10; i3++) {
                    RawMessage rawMessage = (RawMessage) arrayList.get(i3);
                    Assert.assertEquals(i, rawMessage.getPayload().length);
                    Assert.assertEquals(Strings.repeat(Integer.toString(i3), i), Bytes.toString(rawMessage.getPayload()));
                }
                client.deleteTopic(topicId);
            } finally {
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPayloadTable() throws Exception {
        TopicId topicId = new NamespaceId("ns1").topic("testPayloadTable");
        client.createTopic(new TopicMetadata(topicId, new Object[0]));
        try {
            client.storePayload(StoreRequestBuilder.of(topicId).setTransaction(1L).build());
            Assert.fail("Expected IllegalArgumentException");
        } catch (IllegalArgumentException e) {
        }
        for (int i = 0; i < 10; i++) {
            String num = Integer.toString(i);
            client.storePayload(StoreRequestBuilder.of(topicId).addPayloads(new String[]{num, num}).setTransaction(1L).build());
        }
        CloseableIterator fetch = client.prepareFetch(topicId).fetch();
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(fetch.hasNext());
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                client.publish(StoreRequestBuilder.of(topicId).setTransaction(1L).build());
                ArrayList arrayList = new ArrayList();
                fetch = client.prepareFetch(topicId).fetch();
                Throwable th3 = null;
                try {
                    try {
                        Iterators.addAll(arrayList, fetch);
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        Assert.assertEquals(20L, arrayList.size());
                        for (int i2 = 0; i2 < 20; i2 += 2) {
                            String bytes = Bytes.toString(((RawMessage) arrayList.get(i2)).getPayload());
                            Assert.assertEquals(bytes, Bytes.toString(((RawMessage) arrayList.get(i2 + 1)).getPayload()));
                            Assert.assertEquals(Integer.toString(i2 / 2), bytes);
                        }
                        arrayList.clear();
                        CloseableIterator fetch2 = client.prepareFetch(topicId).setLimit(6).fetch();
                        Throwable th5 = null;
                        try {
                            try {
                                Iterators.addAll(arrayList, fetch2);
                                if (fetch2 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch2.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        fetch2.close();
                                    }
                                }
                                Assert.assertEquals(6L, arrayList.size());
                                for (int i3 = 0; i3 < 6; i3 += 2) {
                                    String bytes2 = Bytes.toString(((RawMessage) arrayList.get(i3)).getPayload());
                                    Assert.assertEquals(bytes2, Bytes.toString(((RawMessage) arrayList.get(i3 + 1)).getPayload()));
                                    Assert.assertEquals(Integer.toString(i3 / 2), bytes2);
                                }
                                client.deleteTopic(topicId);
                            } finally {
                            }
                        } finally {
                            if (fetch2 != null) {
                                if (th5 != null) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th7) {
                                        th5.addSuppressed(th7);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }
}
