package co.cask.cdap.flow.stream;

import co.cask.cdap.api.annotation.Batch;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/flow/stream/TestFlowStreamIntegrationApp.class */
public class TestFlowStreamIntegrationApp extends AbstractApplication {
    private static final Logger LOG = LoggerFactory.getLogger(TestFlowStreamIntegrationApp.class);

    /* loaded from: input_file:co/cask/cdap/flow/stream/TestFlowStreamIntegrationApp$StreamReader.class */
    public static class StreamReader extends AbstractFlowlet {
        @ProcessInput
        @Batch(100)
        public void foo(Iterator<StreamEvent> it) {
            ImmutableList copyOf = ImmutableList.copyOf(it);
            TestFlowStreamIntegrationApp.LOG.warn("Number of batched stream events = " + copyOf.size());
            Preconditions.checkState(copyOf.size() > 1);
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it2 = copyOf.iterator();
            while (it2.hasNext()) {
                newArrayList.add(Integer.valueOf(Integer.parseInt(Charsets.UTF_8.decode((ByteBuffer) ((StreamEvent) it2.next()).getBody()).toString())));
            }
            TestFlowStreamIntegrationApp.LOG.info("Read events=" + newArrayList);
        }
    }

    /* loaded from: input_file:co/cask/cdap/flow/stream/TestFlowStreamIntegrationApp$StreamTestFlow.class */
    public static class StreamTestFlow extends AbstractFlow {
        protected void configureFlow() {
            setName("StreamTestFlow");
            setDescription("Flow for testing batch stream dequeue");
            addFlowlet(new StreamReader());
            connectStream("s1", "StreamReader");
        }
    }

    public void configure() {
        setName("TestFlowStreamIntegrationApp");
        setDescription("Application for testing batch stream dequeue");
        addStream(new Stream("s1"));
        addFlow(new StreamTestFlow());
    }
}
