package co.cask.cdap.stream.app;

import co.cask.cdap.AllProgramsApp;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.Flow;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import com.google.common.base.Charsets;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/stream/app/StreamApp.class */
public final class StreamApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/stream/app/StreamApp$StreamFlow.class */
    public static final class StreamFlow implements Flow {
        public FlowSpecification configure() {
            return FlowSpecification.Builder.with().setName("StreamFlow").setDescription("StreamFlow").withFlowlets().add("reader", new StreamReader()).connect().fromStream(AllProgramsApp.STREAM_NAME).to("reader").build();
        }
    }

    /* loaded from: input_file:co/cask/cdap/stream/app/StreamApp$StreamReader.class */
    public static final class StreamReader extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(StreamReader.class);

        @UseDataSet("streamout")
        KeyValueTable keyValueTable;

        @ProcessInput
        public void process(StreamEvent streamEvent) throws InterruptedException {
            String charBuffer = Charsets.UTF_8.decode((ByteBuffer) streamEvent.getBody()).toString();
            LOG.info(charBuffer);
            this.keyValueTable.increment(charBuffer.getBytes(Charsets.UTF_8), 1L);
        }
    }

    public void configure() {
        setName("StreamApp");
        setDescription("StreamApp");
        addStream(new Stream(AllProgramsApp.STREAM_NAME));
        createDataset("streamout", KeyValueTable.class);
        addFlow(new StreamFlow());
    }
}
