package co.cask.cdap.flow.stream;

import co.cask.cdap.api.metrics.RuntimeMetrics;
import co.cask.cdap.batch.stream.NoMapperStreamSpaceApp;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.FlowManager;
import co.cask.cdap.test.SlowTests;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTests.class})
/* loaded from: input_file:co/cask/cdap/flow/stream/FlowStreamIntegrationTestRun.class */
public class FlowStreamIntegrationTestRun extends TestFrameworkTestBase {
    @Test
    public void testStreamBatch() throws Exception {
        ApplicationManager deployApplication = deployApplication(TestFlowStreamIntegrationApp.class, new File[0]);
        StreamManager streamManager = getStreamManager("s1");
        for (int i = 0; i < 50; i++) {
            streamManager.send(String.valueOf(i));
        }
        submitAndVerifyFlowProgram(deployApplication.getFlowManager("StreamTestFlow"));
    }

    @Test
    public void testStreamFromOtherNamespaceBatch() throws Exception {
        NamespaceId namespaceId = new NamespaceId(NoMapperStreamSpaceApp.INPUTSTREAMSPACE);
        getNamespaceAdmin().create(new NamespaceMeta.Builder().setName(namespaceId).build());
        deployApplication(namespaceId, TestFlowStreamIntegrationAcrossNSApp.class, new File[0]);
        ApplicationManager deployApplication = deployApplication(TestFlowStreamIntegrationAcrossNSApp.class, new File[0]);
        StreamManager streamManager = getStreamManager(namespaceId.stream("s1"));
        StreamManager streamManager2 = getStreamManager("s1");
        for (int i = 0; i < 50; i++) {
            streamManager.send(String.valueOf(i));
            streamManager2.send(String.valueOf(i));
        }
        submitAndVerifyFlowProgram(deployApplication.getFlowManager("StreamAcrossNSTestFlow"));
    }

    private void submitAndVerifyFlowProgram(FlowManager flowManager) throws Exception {
        flowManager.start();
        RuntimeMetrics flowletMetrics = flowManager.getFlowletMetrics("StreamReader");
        flowletMetrics.waitForProcessed(1L, 10L, TimeUnit.SECONDS);
        if (flowletMetrics.getException() > 0) {
            Assert.fail("StreamReader test failed");
        }
    }
}
