package co.cask.cdap.conversion.app;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.stream.GenericStreamEventData;
import co.cask.cdap.conversion.avro.Converter;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/conversion/app/StreamConversionMapReduce.class */
public class StreamConversionMapReduce extends AbstractMapReduce {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConversionMapReduce.class);
    public static final String ADAPTER_PROPERTIES = "adapter.properties";
    public static final String SCHEMA_KEY = "cdap.stream.conversion.output.schema";
    public static final String HEADERS_KEY = "cdap.stream.conversion.headers";
    private String sinkName;
    private String outputPath;
    private Long partitionTime;

    /* loaded from: input_file:co/cask/cdap/conversion/app/StreamConversionMapReduce$StreamConversionMapper.class */
    public static class StreamConversionMapper extends Mapper<LongWritable, GenericStreamEventData<Object>, AvroKey<GenericRecord>, NullWritable> {
        private Schema schema;
        private String[] headers;
        private Converter converter;

        protected void setup(Mapper<LongWritable, GenericStreamEventData<Object>, AvroKey<GenericRecord>, NullWritable>.Context context) throws IOException, InterruptedException {
            this.schema = new Schema.Parser().parse(context.getConfiguration().get(StreamConversionMapReduce.SCHEMA_KEY));
            String str = context.getConfiguration().get(StreamConversionMapReduce.HEADERS_KEY);
            this.headers = str == null ? new String[0] : str.split(",");
            this.converter = new Converter(this.schema, this.headers);
        }

        public void map(LongWritable longWritable, GenericStreamEventData<Object> genericStreamEventData, Mapper<LongWritable, GenericStreamEventData<Object>, AvroKey<GenericRecord>, NullWritable>.Context context) throws IOException, InterruptedException {
            context.write(new AvroKey(this.converter.convert(genericStreamEventData.getBody(), longWritable.get(), (Map) Objects.firstNonNull(genericStreamEventData.getHeaders(), ImmutableMap.of()))), NullWritable.get());
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (GenericStreamEventData<Object>) obj2, (Mapper<LongWritable, GenericStreamEventData<Object>, AvroKey<GenericRecord>, NullWritable>.Context) context);
        }
    }

    public void configure() {
        setDescription("Job to read a chunk of stream events and write them to a FileSet");
        setMapperResources(new Resources(512));
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        Job job = (Job) mapReduceContext.getHadoopJob();
        job.setMapperClass(StreamConversionMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(AvroKey.class);
        job.setMapOutputValueClass(NullWritable.class);
        AdapterArguments adapterArguments = new AdapterArguments(mapReduceContext.getRuntimeArguments());
        this.sinkName = adapterArguments.getSinkName();
        this.partitionTime = Long.valueOf(mapReduceContext.getLogicalStartTime());
        long logicalStartTime = mapReduceContext.getLogicalStartTime();
        StreamBatchReadable.useStreamInput(mapReduceContext, adapterArguments.getSourceName(), logicalStartTime - adapterArguments.getFrequency(), logicalStartTime, adapterArguments.getSourceFormatSpec());
        mapReduceContext.setMapperResources(adapterArguments.getMapperResources());
        HashMap newHashMap = Maps.newHashMap();
        TimePartitionedFileSetArguments.setOutputPartitionTime(newHashMap, this.partitionTime.longValue());
        TimePartitionedFileSet dataset = mapReduceContext.getDataset(this.sinkName, newHashMap);
        this.outputPath = FileSetArguments.getOutputPath(dataset.getEmbeddedFileSet().getRuntimeArguments());
        mapReduceContext.setOutput(this.sinkName, dataset);
        AvroJob.setOutputKeySchema(job, adapterArguments.getSinkSchema());
        job.getConfiguration().set(SCHEMA_KEY, adapterArguments.getSinkSchema().toString());
        if (adapterArguments.getHeadersStr() != null) {
            job.getConfiguration().set(HEADERS_KEY, adapterArguments.getHeadersStr());
        }
        job.setJobName("adapter.stream-conversion." + adapterArguments.getSourceName() + ".to." + this.sinkName + "." + this.partitionTime);
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        if (z) {
            TimePartitionedFileSet dataset = mapReduceContext.getDataset(this.sinkName);
            LOG.info("Adding partition for time {} with path {} to dataset '{}'", new Object[]{this.partitionTime, this.outputPath, this.sinkName});
            dataset.addPartition(this.partitionTime.longValue(), this.outputPath);
        }
    }
}
