package co.cask.cdap.app.runtime.monitor;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.NoOpProgramStateWriter;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.program.MessagingProgramStateWriter;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.distributed.remote.RemoteProcessController;
import co.cask.cdap.internal.app.runtime.monitor.RuntimeMonitor;
import co.cask.cdap.internal.app.runtime.monitor.RuntimeMonitorClient;
import co.cask.cdap.internal.app.runtime.monitor.RuntimeMonitorServer;
import co.cask.cdap.internal.guice.AppFabricTestModule;
import co.cask.cdap.internal.profile.ProfileMetricService;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.context.MultiThreadMessagingContext;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.security.tools.KeyStores;
import co.cask.common.http.HttpRequestConfig;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/app/runtime/monitor/RuntimeMonitorTest.class */
public class RuntimeMonitorTest {
    private static CConfiguration cConf;
    private static MessagingService messagingService;
    private final AtomicReference<ProgramRunId> publishProgramKilled = new AtomicReference<>();
    private RuntimeMonitorServer runtimeServer;
    private MultiThreadMessagingContext messagingContext;
    private TransactionManager txManager;
    private DatasetService datasetService;
    private DatasetFramework datasetFramework;
    private Transactional transactional;
    private MetricsCollectionService metricsCollectionService;
    private KeyStore serverKeyStore;
    private KeyStore clientKeyStore;

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.app.runtime.monitor.RuntimeMonitorTest$3, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/app/runtime/monitor/RuntimeMonitorTest$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$ProgramStatus = new int[ProgramStatus.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$ProgramStatus[ProgramStatus.INITIALIZING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$ProgramStatus[ProgramStatus.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$ProgramStatus[ProgramStatus.COMPLETED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$ProgramStatus[ProgramStatus.FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$ProgramStatus[ProgramStatus.KILLED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/app/runtime/monitor/RuntimeMonitorTest$MockRemoteProcessController.class */
    private static final class MockRemoteProcessController implements RemoteProcessController {
        private boolean isRunning;

        private MockRemoteProcessController() {
        }

        public boolean isRunning() throws Exception {
            return this.isRunning;
        }

        public void kill() throws Exception {
        }
    }

    @Before
    public void init() throws IOException {
        cConf = CConfiguration.create();
        cConf.set("local.data.dir", TMP_FOLDER.newFolder().getAbsolutePath());
        cConf.unset("app.program.runtime.monitor.server.host");
        cConf.set("app.program.runtime.monitor.server.port", "0");
        cConf.set("app.program.runtime.monitor.batch.size", "2");
        cConf.set("app.program.runtime.monitor.polltime.ms", "200");
        cConf.set("app.program.runtime.monitor.graceful.shutdown.ms", "1000");
        this.serverKeyStore = KeyStores.generatedCertKeyStore(1, "");
        this.clientKeyStore = KeyStores.generatedCertKeyStore(1, "");
        Injector createInjector = Guice.createInjector(new Module[]{new AppFabricTestModule(cConf), new PrivateModule() { // from class: co.cask.cdap.app.runtime.monitor.RuntimeMonitorTest.1
            protected void configure() {
                bind(KeyStore.class).annotatedWith(Constants.AppFabric.KeyStore.class).toInstance(RuntimeMonitorTest.this.serverKeyStore);
                bind(KeyStore.class).annotatedWith(Constants.AppFabric.TrustStore.class).toInstance(RuntimeMonitorTest.this.clientKeyStore);
                bind(RuntimeMonitorServer.class);
                expose(RuntimeMonitorServer.class);
                bind(Cancellable.class).toInstance(() -> {
                    ProgramRunId programRunId = (ProgramRunId) RuntimeMonitorTest.this.publishProgramKilled.get();
                    if (programRunId == null || !RuntimeMonitorTest.this.publishProgramKilled.compareAndSet(programRunId, null)) {
                        return;
                    }
                    RuntimeMonitorTest.this.publishProgramStatus(programRunId, ProgramStatus.KILLED);
                });
            }
        }});
        messagingService = (MessagingService) createInjector.getInstance(MessagingService.class);
        if (messagingService instanceof Service) {
            messagingService.startAndWait();
        }
        this.messagingContext = new MultiThreadMessagingContext(messagingService);
        this.txManager = (TransactionManager) createInjector.getInstance(TransactionManager.class);
        this.txManager.startAndWait();
        TransactionSystemClient transactionSystemClient = (TransactionSystemClient) createInjector.getInstance(TransactionSystemClient.class);
        this.datasetFramework = (DatasetFramework) createInjector.getInstance(DatasetFramework.class);
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(this.datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[]{this.messagingContext})), RetryStrategies.retryOnConflict(20, 100L));
        this.datasetService = (DatasetService) createInjector.getInstance(DatasetService.class);
        this.datasetService.startAndWait();
        this.runtimeServer = (RuntimeMonitorServer) createInjector.getInstance(RuntimeMonitorServer.class);
        this.runtimeServer.startAndWait();
        this.metricsCollectionService = (MetricsCollectionService) createInjector.getInstance(MetricsCollectionService.class);
    }

    @After
    public void stop() {
        this.runtimeServer.stopAndWait();
        this.datasetService.stopAndWait();
        this.txManager.stopAndWait();
        if (messagingService instanceof Service) {
            messagingService.stopAndWait();
        }
    }

    @Test
    public void testRunTimeMonitor() throws Exception {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5, Threads.createDaemonThreadFactory("test"));
        ProgramRunId run = NamespaceId.DEFAULT.app("app1").workflow("myworkflow").run(RunIds.generate());
        ProfileId profile = NamespaceId.DEFAULT.profile("myProfile");
        publishProgramStatus(run, ProgramStatus.INITIALIZING);
        publishProgramStatus(run, ProgramStatus.RUNNING);
        verifyPublishedMessages(cConf, 2, null);
        CConfiguration copy = CConfiguration.copy(cConf);
        copy.set("program.status.event.topic", "cdap-programStatus");
        messagingService.createTopic(new TopicMetadata(NamespaceId.SYSTEM.topic("cdap-programStatus"), new Object[0]));
        RuntimeMonitorClient runtimeMonitorClient = new RuntimeMonitorClient(this.runtimeServer.getBindAddress().getHostName(), this.runtimeServer.getBindAddress().getPort(), HttpRequestConfig.DEFAULT, this.clientKeyStore, this.serverKeyStore);
        ProfileMetricService profileMetricService = new ProfileMetricService(this.metricsCollectionService, run, profile, 1, newScheduledThreadPool);
        RuntimeMonitor runtimeMonitor = new RuntimeMonitor(run, copy, runtimeMonitorClient, this.datasetFramework, this.transactional, this.messagingContext, newScheduledThreadPool, it -> {
        }, profileMetricService, new MockRemoteProcessController(), new NoOpProgramStateWriter());
        runtimeMonitor.startAndWait();
        String verifyPublishedMessages = verifyPublishedMessages(copy, 2, null);
        runtimeMonitor.stopAndWait();
        publishProgramStatus(run, ProgramStatus.RUNNING);
        publishProgramStatus(run, ProgramStatus.RUNNING);
        verifyPublishedMessages(cConf, 2, verifyPublishedMessages);
        RuntimeMonitor runtimeMonitor2 = new RuntimeMonitor(run, copy, runtimeMonitorClient, this.datasetFramework, this.transactional, this.messagingContext, newScheduledThreadPool, it2 -> {
        }, profileMetricService, new MockRemoteProcessController(), new NoOpProgramStateWriter());
        runtimeMonitor2.startAndWait();
        String verifyPublishedMessages2 = verifyPublishedMessages(copy, 2, verifyPublishedMessages);
        publishProgramStatus(run, ProgramStatus.COMPLETED);
        verifyPublishedMessages(copy, 1, verifyPublishedMessages2);
        Service.State state = Service.State.TERMINATED;
        RuntimeMonitorServer runtimeMonitorServer = this.runtimeServer;
        runtimeMonitorServer.getClass();
        Tasks.waitFor(state, runtimeMonitorServer::state, 10L, TimeUnit.SECONDS);
        Service.State state2 = Service.State.TERMINATED;
        runtimeMonitor2.getClass();
        Tasks.waitFor(state2, runtimeMonitor2::state, 10L, TimeUnit.SECONDS);
    }

    @Test
    public void testTopicExpansion() throws Exception {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5, Threads.createDaemonThreadFactory("test"));
        ProgramRunId run = NamespaceId.DEFAULT.app("app1").workflow("testTopicExpansion").run(RunIds.generate());
        ProfileId profile = NamespaceId.DEFAULT.profile("myProfile");
        CConfiguration copy = CConfiguration.copy(cConf);
        copy.set("program.status.event.topic", "cdap-programStatus");
        messagingService.createTopic(new TopicMetadata(NamespaceId.SYSTEM.topic("cdap-programStatus"), new Object[0]));
        String str = cConf.get("metrics.topic.prefix");
        int i = copy.getInt("metrics.messaging.topic.num");
        String str2 = "cdap-" + str;
        copy.set("metrics.topic.prefix", str2);
        for (int i2 = 0; i2 < i; i2++) {
            messagingService.createTopic(new TopicMetadata(NamespaceId.SYSTEM.topic(str2 + i2), new Object[0]));
        }
        for (int i3 = 0; i3 < i; i3++) {
            this.messagingContext.getMessagePublisher().publish(NamespaceId.SYSTEM.getNamespace(), str + i3, new String[]{"test" + i3});
        }
        RuntimeMonitor runtimeMonitor = new RuntimeMonitor(run, copy, new RuntimeMonitorClient(this.runtimeServer.getBindAddress().getHostName(), this.runtimeServer.getBindAddress().getPort(), HttpRequestConfig.DEFAULT, this.clientKeyStore, this.serverKeyStore), this.datasetFramework, this.transactional, this.messagingContext, newScheduledThreadPool, it -> {
        }, new ProfileMetricService(this.metricsCollectionService, run, profile, 1, newScheduledThreadPool), new MockRemoteProcessController(), new NoOpProgramStateWriter());
        runtimeMonitor.startAndWait();
        Tasks.waitFor(true, () -> {
            for (int i4 = 0; i4 < i; i4++) {
                CloseableIterator fetch = this.messagingContext.getMessageFetcher().fetch(NamespaceId.SYSTEM.getNamespace(), str2 + i4, 10, (String) null);
                Throwable th = null;
                try {
                    if (!("test" + i4).equals(StreamSupport.stream(Spliterators.spliteratorUnknownSize((Iterator) fetch, 16), false).map((v0) -> {
                        return v0.getPayloadAsString();
                    }).findFirst().orElse(null))) {
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        return false;
                    }
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                } catch (Throwable th4) {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th4;
                }
            }
            return true;
        }, 1000L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        publishProgramStatus(run, ProgramStatus.COMPLETED);
        Service.State state = Service.State.TERMINATED;
        RuntimeMonitorServer runtimeMonitorServer = this.runtimeServer;
        runtimeMonitorServer.getClass();
        Tasks.waitFor(state, runtimeMonitorServer::state, 10L, TimeUnit.SECONDS);
        Service.State state2 = Service.State.TERMINATED;
        runtimeMonitor.getClass();
        Tasks.waitFor(state2, runtimeMonitor::state, 10L, TimeUnit.SECONDS);
    }

    @Test
    public void testStopProgram() throws Exception {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5, Threads.createDaemonThreadFactory("test"));
        ProgramRunId run = NamespaceId.DEFAULT.app("app1").workflow("testkill").run(RunIds.generate());
        ProfileId profile = NamespaceId.DEFAULT.profile("myProfile");
        publishProgramStatus(run, ProgramStatus.INITIALIZING);
        publishProgramStatus(run, ProgramStatus.RUNNING);
        CConfiguration copy = CConfiguration.copy(cConf);
        copy.set("program.status.event.topic", "cdap-programStatus");
        messagingService.createTopic(new TopicMetadata(NamespaceId.SYSTEM.topic("cdap-programStatus"), new Object[0]));
        RuntimeMonitor runtimeMonitor = new RuntimeMonitor(run, copy, new RuntimeMonitorClient(this.runtimeServer.getBindAddress().getHostName(), this.runtimeServer.getBindAddress().getPort(), HttpRequestConfig.DEFAULT, this.clientKeyStore, this.serverKeyStore), this.datasetFramework, this.transactional, this.messagingContext, newScheduledThreadPool, it -> {
        }, new ProfileMetricService(this.metricsCollectionService, run, profile, 1, newScheduledThreadPool), new MockRemoteProcessController(), new NoOpProgramStateWriter());
        runtimeMonitor.startAndWait();
        verifyPublishedMessages(copy, 2, null);
        this.publishProgramKilled.set(run);
        runtimeMonitor.requestStop();
        Service.State state = Service.State.TERMINATED;
        RuntimeMonitorServer runtimeMonitorServer = this.runtimeServer;
        runtimeMonitorServer.getClass();
        Tasks.waitFor(state, runtimeMonitorServer::state, 10L, TimeUnit.SECONDS);
        Service.State state2 = Service.State.TERMINATED;
        runtimeMonitor.getClass();
        Tasks.waitFor(state2, runtimeMonitor::state, 10L, TimeUnit.SECONDS);
    }

    private String verifyPublishedMessages(final CConfiguration cConfiguration, final int i, @Nullable final String str) throws Exception {
        final String[] strArr = {null};
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.app.runtime.monitor.RuntimeMonitorTest.2
            int count = 0;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                Transactional transactional = RuntimeMonitorTest.this.transactional;
                CConfiguration cConfiguration2 = cConfiguration;
                String str2 = str;
                String[] strArr2 = strArr;
                transactional.execute(datasetContext -> {
                    CloseableIterator fetch = RuntimeMonitorTest.this.messagingContext.getMessageFetcher().fetch(NamespaceId.SYSTEM.getNamespace(), cConfiguration2.get("program.status.event.topic"), 100, str2);
                    Throwable th = null;
                    while (fetch.hasNext()) {
                        try {
                            try {
                                strArr2[0] = ((Message) fetch.next()).getId();
                                this.count++;
                            } catch (Throwable th2) {
                                th = th2;
                                throw th2;
                            }
                        } catch (Throwable th3) {
                            if (fetch != null) {
                                if (th != null) {
                                    try {
                                        fetch.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    fetch.close();
                                }
                            }
                            throw th3;
                        }
                    }
                    if (fetch != null) {
                        if (0 == 0) {
                            fetch.close();
                            return;
                        }
                        try {
                            fetch.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                });
                return Boolean.valueOf(this.count == i);
            }
        }, 5L, TimeUnit.MINUTES);
        return strArr[0];
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishProgramStatus(ProgramRunId programRunId, ProgramStatus programStatus) {
        MessagingProgramStateWriter messagingProgramStateWriter = new MessagingProgramStateWriter(cConf, messagingService);
        switch (AnonymousClass3.$SwitchMap$co$cask$cdap$api$ProgramStatus[programStatus.ordinal()]) {
            case 1:
                messagingProgramStateWriter.start(programRunId, new SimpleProgramOptions(programRunId.getParent()), (String) null, (ProgramDescriptor) null);
                return;
            case 2:
                messagingProgramStateWriter.running(programRunId, (String) null);
                return;
            case 3:
                messagingProgramStateWriter.completed(programRunId);
                return;
            case 4:
                messagingProgramStateWriter.error(programRunId, new Exception("Program run failed"));
                return;
            case 5:
                messagingProgramStateWriter.killed(programRunId);
                return;
            default:
                throw new IllegalArgumentException("Unsupported program status " + programStatus);
        }
    }
}
