package co.cask.cdap.dq;

import ch.qos.logback.core.net.SyslogConstants;
import co.cask.cdap.api.Config;
import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceConfigurer;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.plugin.PluginProperties;
import co.cask.cdap.api.schedule.Schedules;
import co.cask.cdap.api.stream.GenericStreamEventData;
import co.cask.cdap.dq.functions.BasicAggregationFunction;
import co.cask.cdap.dq.functions.CombinableAggregationFunction;
import co.cask.cdap.dq.rowkey.AggregationsRowKey;
import co.cask.cdap.dq.rowkey.ValuesRowKey;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.batch.mapreduce.MapReduceSourceContext;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.planner.StageInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/dq/DataQualityApp.class */
public class DataQualityApp extends AbstractApplication<DataQualityConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(DataQualityApp.class);
    private static final Gson GSON = new Gson();
    private static final Type TOKEN_TYPE_MAP_STRING_SET_STRING = new TypeToken<Map<String, Set<String>>>() { // from class: co.cask.cdap.dq.DataQualityApp.1
    }.getType();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.dq.DataQualityApp$2, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/dq/DataQualityApp$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type = new int[Schema.Type.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.INT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.LONG.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.DOUBLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.FLOAT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.BOOLEAN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.BYTES.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/dq/DataQualityApp$AggregationMapper.class */
    public static class AggregationMapper extends Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, DataQualityWritable> implements ProgramLifecycle<MapReduceContext> {
        private Set<String> fieldsSet = new HashSet();

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            Map map = (Map) DataQualityApp.GSON.fromJson(mapReduceContext.getSpecification().getProperty("fieldAggregations"), DataQualityApp.TOKEN_TYPE_MAP_STRING_SET_STRING);
            if (map != null) {
                this.fieldsSet = map.keySet();
            }
        }

        public void map(LongWritable longWritable, GenericStreamEventData<StructuredRecord> genericStreamEventData, Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, DataQualityWritable>.Context context) throws IOException, InterruptedException {
            StructuredRecord structuredRecord = (StructuredRecord) genericStreamEventData.getBody();
            for (Schema.Field field : structuredRecord.getSchema().getFields()) {
                String name = field.getName();
                Object obj = structuredRecord.get(name);
                if (obj != null) {
                    DataQualityWritable outputValue = field.getSchema().isNullable() ? getOutputValue(field.getSchema().getNonNullable().getType(), obj) : getOutputValue(field.getSchema().getType(), obj);
                    if (outputValue != null && (this.fieldsSet.contains(name) || this.fieldsSet.isEmpty())) {
                        context.write(new Text(name), outputValue);
                    }
                }
            }
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:2:0x0010. Please report as an issue. */
        @Nullable
        private DataQualityWritable getOutputValue(Schema.Type type, Object obj) {
            DataQualityWritable dataQualityWritable = new DataQualityWritable();
            switch (AnonymousClass2.$SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[type.ordinal()]) {
                case 1:
                    dataQualityWritable.set(new Text((String) obj));
                    return dataQualityWritable;
                case 2:
                    dataQualityWritable.set(new IntWritable(((Integer) obj).intValue()));
                    return dataQualityWritable;
                case SyslogConstants.ERROR_SEVERITY /* 3 */:
                    dataQualityWritable.set(new LongWritable(((Long) obj).longValue()));
                    return dataQualityWritable;
                case 4:
                    dataQualityWritable.set(new DoubleWritable(((Double) obj).doubleValue()));
                    return dataQualityWritable;
                case 5:
                    dataQualityWritable.set(new FloatWritable(((Float) obj).floatValue()));
                    return dataQualityWritable;
                case SyslogConstants.INFO_SEVERITY /* 6 */:
                    dataQualityWritable.set(new BooleanWritable(((Boolean) obj).booleanValue()));
                    return dataQualityWritable;
                case SyslogConstants.DEBUG_SEVERITY /* 7 */:
                    dataQualityWritable.set(new ByteWritable(((Byte) obj).byteValue()));
                    return dataQualityWritable;
                default:
                    return null;
            }
        }

        public void destroy() {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (GenericStreamEventData<StructuredRecord>) obj2, (Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, DataQualityWritable>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/dq/DataQualityApp$AggregationReducer.class */
    public static class AggregationReducer extends Reducer<Text, DataQualityWritable, byte[], Put> implements ProgramLifecycle<MapReduceContext> {
        private static final Gson GSON = new Gson();
        private String sourceId;
        private Map<String, Set<String>> fieldAggregations;
        long timeKey = 0;

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            this.timeKey = mapReduceContext.getLogicalStartTime();
            this.sourceId = mapReduceContext.getSpecification().getProperty("sourceId");
            this.fieldAggregations = (Map) GSON.fromJson(mapReduceContext.getSpecification().getProperty("fieldAggregations"), DataQualityApp.TOKEN_TYPE_MAP_STRING_SET_STRING);
        }

        public void reduce(Text text, Iterable<DataQualityWritable> iterable, Reducer<Text, DataQualityWritable, byte[], Put>.Context context) throws IOException, InterruptedException {
            DataQualityApp.LOG.trace("timestamp: {}", Long.valueOf(this.timeKey));
            Set<String> set = this.fieldAggregations.get(text.toString());
            ArrayList arrayList = new ArrayList();
            AggregationsRowKey aggregationsRowKey = new AggregationsRowKey(this.timeKey, this.sourceId);
            byte[] bytes = Bytes.toBytes(text.toString());
            for (String str : set) {
                try {
                    BasicAggregationFunction basicAggregationFunction = (BasicAggregationFunction) Class.forName("co.cask.cdap.dq.functions." + str).newInstance();
                    boolean z = basicAggregationFunction instanceof CombinableAggregationFunction;
                    Iterator<DataQualityWritable> it = iterable.iterator();
                    while (it.hasNext()) {
                        basicAggregationFunction.add(it.next());
                    }
                    ValuesRowKey valuesRowKey = new ValuesRowKey(this.timeKey, text.toString(), this.sourceId);
                    context.write(valuesRowKey.getTableRowKey(), new Put(valuesRowKey.getTableRowKey(), Bytes.toBytes(str), basicAggregationFunction.aggregate()));
                    arrayList.add(new AggregationTypeValue(str, z));
                } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | RuntimeException e) {
                    throw new RuntimeException(e);
                }
            }
            context.write(aggregationsRowKey.getTableRowKey(), new Put(aggregationsRowKey.getTableRowKey(), bytes, Bytes.toBytes(GSON.toJson(arrayList))));
        }

        public void destroy() {
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<DataQualityWritable>) iterable, (Reducer<Text, DataQualityWritable, byte[], Put>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/dq/DataQualityApp$DataQualityConfig.class */
    public static class DataQualityConfig extends Config {
        private int workflowScheduleMinutes;
        private DataQualitySource source;
        private String datasetName;
        private Map<String, Set<String>> fieldAggregations;

        public DataQualityConfig(int i, DataQualitySource dataQualitySource, String str, Map<String, Set<String>> map) {
            this.workflowScheduleMinutes = i;
            this.source = dataQualitySource;
            this.datasetName = str;
            this.fieldAggregations = map;
        }
    }

    /* loaded from: input_file:co/cask/cdap/dq/DataQualityApp$FieldAggregator.class */
    public static final class FieldAggregator extends AbstractMapReduce {
        private static final String PLUGIN_ID = "input";
        private final String datasetName;
        private final Map<String, Set<String>> fieldAggregations;
        private DataQualitySource source;
        private Metrics metrics;

        public FieldAggregator(DataQualitySource dataQualitySource, String str, Map<String, Set<String>> map) {
            this.source = dataQualitySource;
            this.datasetName = str;
            this.fieldAggregations = map;
        }

        public void configure() {
            super.configure();
            MapReduceConfigurer configurer = getConfigurer();
            BatchSource batchSource = (BatchSource) usePlugin(BatchSource.PLUGIN_TYPE, this.source.getName(), PLUGIN_ID, PluginProperties.builder().addAll(this.source.getProperties()).build());
            Preconditions.checkNotNull(batchSource, "Could not find plugin %s of type 'source'", new Object[]{this.source.getName()});
            batchSource.configurePipeline(new MapReducePipelineConfigurer(configurer, PLUGIN_ID));
            setName("FieldAggregator");
            setProperties(ImmutableMap.builder().put("fieldAggregations", DataQualityApp.GSON.toJson(this.fieldAggregations)).put("sourceId", this.source.getId()).put("sourceName", this.source.getName()).put("datasetName", this.datasetName).build());
        }

        public void initialize() throws Exception {
            MapReduceContext context = getContext();
            Job job = (Job) context.getHadoopJob();
            job.setMapperClass(AggregationMapper.class);
            job.setReducerClass(AggregationReducer.class);
            BatchSource batchSource = (BatchSource) context.newPluginInstance(PLUGIN_ID);
            String str = "batchsource:" + context.getSpecification().getProperty("sourceName") + ":0";
            batchSource.prepareRun(new MapReduceSourceContext(context, this.metrics, new DatasetContextLookupProvider(context), context.getRuntimeArguments(), StageInfo.builder(str, BatchSource.PLUGIN_TYPE).build(), context.getDataTracer(str).isEnabled()));
            context.addOutput(Output.ofDataset(context.getSpecification().getProperty("datasetName")));
        }
    }

    public void configure() {
        DataQualityConfig dataQualityConfig = (DataQualityConfig) getContext().getConfig();
        Preconditions.checkArgument(dataQualityConfig.workflowScheduleMinutes > 0, "Workflow Frequency in minutes (>0) should be provided");
        Preconditions.checkArgument(dataQualityConfig.source != null, "Configuration for DataQualityApp Source is missing");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(dataQualityConfig.source.getName()), "Data Quality source name should not be null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(dataQualityConfig.source.getId()), "Data Quality source id should not be null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(dataQualityConfig.datasetName), "Output Dataset name should be not be null or empty");
        Preconditions.checkArgument(dataQualityConfig.fieldAggregations != null, "fieldAggregations needs to be specified");
        Preconditions.checkArgument(!dataQualityConfig.fieldAggregations.isEmpty(), "fieldAggregations should not be empty");
        boolean z = false;
        Iterator it = dataQualityConfig.fieldAggregations.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry entry = (Map.Entry) it.next();
            if (!Strings.isNullOrEmpty((String) entry.getKey()) && !((Set) entry.getValue()).isEmpty()) {
                z = true;
                break;
            }
        }
        Preconditions.checkArgument(z, "At least one field with one or more aggregations must be provided");
        Integer valueOf = Integer.valueOf(dataQualityConfig.workflowScheduleMinutes);
        addMapReduce(new FieldAggregator(dataQualityConfig.source, dataQualityConfig.datasetName, dataQualityConfig.fieldAggregations));
        setName("DataQualityApp");
        setDescription("Application with MapReduce job to determine the data quality in a Batch Source");
        createDataset(dataQualityConfig.datasetName, Table.class);
        addService(new DataQualityService(dataQualityConfig.datasetName));
        addWorkflow(new DataQualityWorkflow());
        scheduleWorkflow(Schedules.builder("aggregatorSchedule").setDescription("Schedule execution every " + valueOf + " min").createTimeSchedule("*/" + valueOf + " * * * *"), "DataQualityWorkflow");
    }
}
