package co.cask.cdap.batch.stream;

import co.cask.cdap.api.annotation.Batch;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp.class */
public class TestBatchStreamIntegrationApp extends AbstractApplication {
    private static final Logger LOG = LoggerFactory.getLogger(TestBatchStreamIntegrationApp.class);

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamReader.class */
    public static class StreamReader extends AbstractFlowlet {
        @ProcessInput
        @Batch(100)
        public void foo(Iterator<StreamEvent> it) {
            ImmutableList copyOf = ImmutableList.copyOf(it);
            TestBatchStreamIntegrationApp.LOG.warn("Number of batched stream events = " + copyOf.size());
            Assert.assertTrue(copyOf.size() > 1);
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it2 = copyOf.iterator();
            while (it2.hasNext()) {
                newArrayList.add(Integer.valueOf(Integer.parseInt(Charsets.UTF_8.decode((ByteBuffer) ((StreamEvent) it2.next()).getBody()).toString())));
            }
            TestBatchStreamIntegrationApp.LOG.info("Read events=" + newArrayList);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestBatch.class */
    public static class StreamTestBatch extends AbstractMapReduce {
        public void configure() {
            useStreamInput("s_1");
            setOutputDataset("results");
        }

        public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
            Job job = (Job) mapReduceContext.getHadoopJob();
            setMapperClass(job);
            job.setReducerClass(StreamTestBatchReducer.class);
        }

        protected void setMapperClass(Job job) {
            job.setMapperClass(StreamTestBatchMapper.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestBatchIdDecoder.class */
    public static class StreamTestBatchIdDecoder extends StreamTestBatch {
        @Override // co.cask.cdap.batch.stream.TestBatchStreamIntegrationApp.StreamTestBatch
        protected void setMapperClass(Job job) {
            job.setMapperClass(StreamTestBatchIdDecoderMapper.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestBatchIdDecoderMapper.class */
    public static class StreamTestBatchIdDecoderMapper extends Mapper<LongWritable, StreamEvent, Text, Text> {
        protected void map(LongWritable longWritable, StreamEvent streamEvent, Mapper<LongWritable, StreamEvent, Text, Text>.Context context) throws IOException, InterruptedException {
            Text text = new Text(((ByteBuffer) streamEvent.getBody()).array());
            context.write(text, text);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (StreamEvent) obj2, (Mapper<LongWritable, StreamEvent, Text, Text>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestBatchMapper.class */
    public static class StreamTestBatchMapper extends Mapper<LongWritable, BytesWritable, Text, Text> {
        protected void map(LongWritable longWritable, BytesWritable bytesWritable, Mapper<LongWritable, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
            Text text = new Text(bytesWritable.copyBytes());
            context.write(text, text);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (BytesWritable) obj2, (Mapper<LongWritable, BytesWritable, Text, Text>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestBatchReducer.class */
    public static class StreamTestBatchReducer extends Reducer<Text, Text, byte[], byte[]> {
        protected void reduce(Text text, Iterable<Text> iterable, Reducer<Text, Text, byte[], byte[]>.Context context) throws IOException, InterruptedException {
            Iterator<Text> it = iterable.iterator();
            while (it.hasNext()) {
                byte[] copyBytes = it.next().copyBytes();
                context.write(copyBytes, copyBytes);
            }
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<Text>) iterable, (Reducer<Text, Text, byte[], byte[]>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/batch/stream/TestBatchStreamIntegrationApp$StreamTestFlow.class */
    public static class StreamTestFlow extends AbstractFlow {
        protected void configureFlow() {
            setName("StreamTestFlow");
            setDescription("Flow for testing batch stream dequeue");
            addFlowlet(new StreamReader());
            connectStream("s_1", "StreamReader");
        }
    }

    public void configure() {
        setName("TestFlowStreamIntegrationApp");
        addStream(new Stream("s_1"));
        createDataset("results", KeyValueTable.class);
        addFlow(new StreamTestFlow());
        addMapReduce(new StreamTestBatch());
        addMapReduce(new StreamTestBatchIdDecoder());
    }
}
