package co.cask.cdap.examples.datacleansing;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.Resources;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.DynamicPartitioner;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.partitioned.KVTableStatePersistor;
import co.cask.cdap.api.dataset.lib.partitioned.PartitionBatchInput;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

/* loaded from: input_file:co/cask/cdap/examples/datacleansing/DataCleansingMapReduce.class */
public class DataCleansingMapReduce extends AbstractMapReduce {
    protected static final String NAME = "DataCleansingMapReduce";
    protected static final String OUTPUT_PARTITION_KEY = "output.partition.key";
    protected static final String SCHEMA_KEY = "schema.key";
    private PartitionBatchInput.BatchPartitionCommitter partitionCommitter;

    /* loaded from: input_file:co/cask/cdap/examples/datacleansing/DataCleansingMapReduce$SchemaMatchingFilter.class */
    public static class SchemaMatchingFilter extends Mapper<LongWritable, Text, NullWritable, Text> implements ProgramLifecycle<MapReduceTaskContext<NullWritable, Text>> {
        public static final Schema DEFAULT_SCHEMA = Schema.recordOf("person", new Schema.Field[]{Schema.Field.of("pid", Schema.of(Schema.Type.LONG)), Schema.Field.of("name", Schema.of(Schema.Type.STRING)), Schema.Field.of("dob", Schema.of(Schema.Type.STRING)), Schema.Field.of("zip", Schema.of(Schema.Type.INT))});
        private SimpleSchemaMatcher schemaMatcher;
        private Metrics mapMetrics;
        private MapReduceTaskContext<NullWritable, Text> mapReduceTaskContext;

        public void initialize(MapReduceTaskContext<NullWritable, Text> mapReduceTaskContext) throws Exception {
            this.mapReduceTaskContext = mapReduceTaskContext;
        }

        public void destroy() {
        }

        protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            String str = context.getConfiguration().get(DataCleansingMapReduce.SCHEMA_KEY);
            if (str == null) {
                this.schemaMatcher = new SimpleSchemaMatcher(DEFAULT_SCHEMA);
            } else {
                this.schemaMatcher = new SimpleSchemaMatcher(Schema.parseJson(str));
            }
        }

        public void map(LongWritable longWritable, Text text, MapReduceTaskContext<NullWritable, Text> mapReduceTaskContext) throws IOException, InterruptedException {
            if (this.schemaMatcher.matches(text.toString())) {
                mapReduceTaskContext.write("cleanRecords", NullWritable.get(), text);
                this.mapMetrics.count("records.valid", 1);
            } else {
                mapReduceTaskContext.write("invalidRecords", NullWritable.get(), text);
                this.mapMetrics.count("records.invalid", 1);
            }
        }

        protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            map(longWritable, text, this.mapReduceTaskContext);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, NullWritable, Text>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/datacleansing/DataCleansingMapReduce$TimeAndZipPartitioner.class */
    public static final class TimeAndZipPartitioner extends DynamicPartitioner<NullWritable, Text> {
        private Long time;
        private JsonParser jsonParser;

        public void initialize(MapReduceTaskContext<NullWritable, Text> mapReduceTaskContext) {
            this.time = Long.valueOf((String) mapReduceTaskContext.getRuntimeArguments().get(DataCleansingMapReduce.OUTPUT_PARTITION_KEY));
            this.jsonParser = new JsonParser();
        }

        public PartitionKey getPartitionKey(NullWritable nullWritable, Text text) {
            return PartitionKey.builder().addLongField("time", this.time.longValue()).addIntField("zip", this.jsonParser.parse(text.toString()).getAsJsonObject().get("zip").getAsInt()).build();
        }
    }

    public void configure() {
        setName(NAME);
        setMapperResources(new Resources(1024));
        setReducerResources(new Resources(1024));
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        this.partitionCommitter = PartitionBatchInput.setInput(mapReduceContext, "rawRecords", new KVTableStatePersistor("consumingState", "state.key"));
        PartitionKey build = PartitionKey.builder().addLongField("time", Long.valueOf((String) mapReduceContext.getRuntimeArguments().get(OUTPUT_PARTITION_KEY)).longValue()).build();
        ImmutableMap of = ImmutableMap.of("source.program", NAME);
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap, build);
        PartitionedFileSetArguments.setOutputPartitionMetadata(hashMap, of);
        mapReduceContext.addOutput(Output.ofDataset("invalidRecords", hashMap));
        HashMap hashMap2 = new HashMap();
        PartitionedFileSetArguments.setDynamicPartitioner(hashMap2, TimeAndZipPartitioner.class);
        PartitionedFileSetArguments.setOutputPartitionMetadata(hashMap2, of);
        mapReduceContext.addOutput(Output.ofDataset("cleanRecords", hashMap2));
        Job job = (Job) mapReduceContext.getHadoopJob();
        job.setMapperClass(SchemaMatchingFilter.class);
        job.setNumReduceTasks(0);
        String str = (String) mapReduceContext.getRuntimeArguments().get(SCHEMA_KEY);
        if (str != null) {
            job.getConfiguration().set(SCHEMA_KEY, str);
        }
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        this.partitionCommitter.onFinish(z);
    }
}
