package co.cask.cdap.test.messaging;

import co.cask.cdap.api.app.Application;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessageFetcher;
import co.cask.cdap.api.messaging.MessagePublisher;
import co.cask.cdap.api.messaging.MessagingAdmin;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.SparkManager;
import co.cask.cdap.test.TestConfiguration;
import co.cask.cdap.test.WorkerManager;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import co.cask.cdap.test.messaging.MessagingApp;
import com.google.common.collect.Iterators;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/test/messaging/MessagingAppTestRun.class */
public class MessagingAppTestRun extends TestFrameworkTestBase {

    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"explore.enabled", false});
    private static final NamespaceId NAMESPACE = new NamespaceId("messageTest");
    private static File artifactJar;

    @BeforeClass
    public static void init() throws Exception {
        artifactJar = createArtifactJar(MessagingApp.class);
    }

    @Before
    public void beforeTest() throws Exception {
        super.beforeTest();
        getNamespaceAdmin().create(new NamespaceMeta.Builder().setName(NAMESPACE).build());
        getMessagingAdmin(NAMESPACE).createTopic("controlTopic");
    }

    @Test
    public void testWithWorker() throws Exception {
        WorkerManager start = deployWithArtifact(NAMESPACE, (Class<? extends Application>) MessagingApp.class, artifactJar).getWorkerManager(MessagingApp.MessagingWorker.class.getSimpleName()).start();
        MessagingContext messagingContext = getMessagingContext();
        final MessagingAdmin messagingAdmin = getMessagingAdmin(NAMESPACE);
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                try {
                    messagingAdmin.getTopicProperties("topic");
                    return true;
                } catch (TopicNotFoundException e) {
                    return false;
                }
            }
        }, 5L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        MessagePublisher messagePublisher = messagingContext.getMessagePublisher();
        messagePublisher.publish(NAMESPACE.getNamespace(), "topic", new String[]{"message"});
        final MessageFetcher messageFetcher = messagingContext.getMessageFetcher();
        Tasks.waitFor("messagemessage", new Callable<String>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                CloseableIterator fetch = messageFetcher.fetch(MessagingAppTestRun.NAMESPACE.getNamespace(), "topic", Integer.MAX_VALUE, 0L);
                Throwable th = null;
                try {
                    Message message = (Message) Iterators.getLast(fetch, (Object) null);
                    return message == null ? null : message.getPayloadAsString();
                } finally {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                }
            }
        }, 5L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        messagePublisher.publish(NAMESPACE.getNamespace(), "topic", new String[]{"messagemessage"});
        try {
            Tasks.waitFor("messagemessagemessagemessage", new Callable<String>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public String call() throws Exception {
                    CloseableIterator fetch = messageFetcher.fetch(MessagingAppTestRun.NAMESPACE.getNamespace(), "topic", Integer.MAX_VALUE, 0L);
                    Throwable th = null;
                    try {
                        Message message = (Message) Iterators.getLast(fetch, (Object) null);
                        return message == null ? null : message.getPayloadAsString();
                    } finally {
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                    }
                }
            }, 2L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected timeout exception");
        } catch (TimeoutException e) {
        }
        messagePublisher.publish(NAMESPACE.getNamespace(), "controlTopic", new String[]{"message"});
        Tasks.waitFor("messagemessagemessagemessage", new Callable<String>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                CloseableIterator fetch = messageFetcher.fetch(MessagingAppTestRun.NAMESPACE.getNamespace(), "topic", Integer.MAX_VALUE, 0L);
                Throwable th = null;
                try {
                    Message message = (Message) Iterators.getLast(fetch, (Object) null);
                    return message == null ? null : message.getPayloadAsString();
                } finally {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                }
            }
        }, 5L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        start.waitForRun(ProgramRunStatus.COMPLETED, 5L, TimeUnit.SECONDS);
    }

    @Test
    public void testTxPublishFetch() throws Exception {
        ApplicationManager deployWithArtifact = deployWithArtifact(NAMESPACE, (Class<? extends Application>) MessagingApp.class, artifactJar);
        MessagingAdmin messagingAdmin = getMessagingAdmin(NAMESPACE);
        WorkerManager workerManager = deployWithArtifact.getWorkerManager(MessagingApp.TransactionalMessagingWorker.class.getSimpleName());
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            boolean booleanValue = ((Boolean) it.next()).booleanValue();
            messagingAdmin.createTopic("topic");
            workerManager.start(Collections.singletonMap("get.in.tx", Boolean.toString(booleanValue)));
            workerManager.waitForRuns(ProgramRunStatus.COMPLETED, booleanValue ? 1 : 2, 60L, TimeUnit.SECONDS);
            messagingAdmin.deleteTopic("topic");
        }
    }

    @Test
    public void testSparkMessaging() throws Exception {
        SparkManager start = deployWithArtifact(NAMESPACE, (Class<? extends Application>) MessagingApp.class, artifactJar).getSparkManager(MessagingSpark.class.getSimpleName()).start();
        final MessageFetcher messageFetcher = getMessagingContext().getMessageFetcher();
        final AtomicReference atomicReference = new AtomicReference();
        final MessagingAdmin messagingAdmin = getMessagingAdmin(NAMESPACE.getNamespace());
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                try {
                    messagingAdmin.getTopicProperties("topic");
                    return true;
                } catch (TopicNotFoundException e) {
                    return false;
                }
            }
        }, 60L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        Iterator it = Arrays.asList("start", "block").iterator();
        while (it.hasNext()) {
            Tasks.waitFor((String) it.next(), new Callable<String>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public String call() throws Exception {
                    CloseableIterator fetch = messageFetcher.fetch(MessagingAppTestRun.NAMESPACE.getNamespace(), "topic", 1, (String) atomicReference.get());
                    Throwable th = null;
                    try {
                        if (!fetch.hasNext()) {
                            return null;
                        }
                        Message message = (Message) fetch.next();
                        atomicReference.set(message.getId());
                        String payloadAsString = message.getPayloadAsString();
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        return payloadAsString;
                    } finally {
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                    }
                }
            }, 60L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        }
        getMessagingContext().getMessagePublisher().publish(NAMESPACE.getNamespace(), "controlTopic", new String[]{"go"});
        Tasks.waitFor("result-15", new Callable<String>() { // from class: co.cask.cdap.test.messaging.MessagingAppTestRun.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                CloseableIterator fetch = messageFetcher.fetch(MessagingAppTestRun.NAMESPACE.getNamespace(), "topic", 1, (String) atomicReference.get());
                Throwable th = null;
                try {
                    if (!fetch.hasNext()) {
                        return null;
                    }
                    Message message = (Message) fetch.next();
                    atomicReference.set(message.getId());
                    String payloadAsString = message.getPayloadAsString();
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    return payloadAsString;
                } finally {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                }
            }
        }, 60L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        start.waitForRun(ProgramRunStatus.COMPLETED, 60L, TimeUnit.SECONDS);
    }
}
