package org.apache.gobblin.compaction.mapreduce.avro;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.io.FilenameUtils;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.class */
public class MRCompactorAvroKeyDedupJobRunner extends MRCompactorJobRunner {
    private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
    public static final String COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA = "compaction.job.avro.single.input.schema";
    public static final String COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC = "compaction.job.avro.key.schema.loc";
    public static final String COMPACTION_JOB_DEDUP_KEY = "compaction.job.dedup.key";
    private static final String AVRO = "avro";
    private static final String SCHEMA_DEDUP_FIELD_ANNOTATOR = "primarykey";
    private final boolean useSingleInputSchema;
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorAvroKeyDedupJobRunner.class);
    public static final DedupKeyOption DEFAULT_DEDUP_KEY_OPTION = DedupKeyOption.KEY;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$avro$Schema$Type = new int[Schema.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$avro$Schema$Type[Schema.Type.RECORD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner$DedupKeyOption.class */
    public enum DedupKeyOption {
        ALL,
        KEY,
        CUSTOM
    }

    /* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner$LastModifiedDescComparator.class */
    public static class LastModifiedDescComparator implements Comparator<FileStatus>, Serializable {
        private static final long serialVersionUID = 1;

        @Override // java.util.Comparator
        public int compare(FileStatus fileStatus, FileStatus fileStatus2) {
            if (fileStatus2.getModificationTime() < fileStatus.getModificationTime()) {
                return -1;
            }
            return fileStatus2.getModificationTime() > fileStatus.getModificationTime() ? 1 : 0;
        }
    }

    public MRCompactorAvroKeyDedupJobRunner(Dataset dataset, FileSystem fileSystem) {
        super(dataset, fileSystem);
        this.useSingleInputSchema = this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void configureJob(Job job) throws IOException {
        super.configureJob(job);
        configureSchema(job);
    }

    private void configureSchema(Job job) throws IOException {
        Schema newestSchemaFromSource = getNewestSchemaFromSource(job, this.fs);
        if (this.useSingleInputSchema) {
            AvroJob.setInputKeySchema(job, newestSchemaFromSource);
        }
        AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchemaFromSource) : newestSchemaFromSource);
        AvroJob.setMapOutputValueSchema(job, newestSchemaFromSource);
        AvroJob.setOutputKeySchema(job, newestSchemaFromSource);
    }

    @VisibleForTesting
    Schema getKeySchema(Job job, Schema schema) throws IOException {
        Schema schema2;
        DedupKeyOption dedupKeyOption = getDedupKeyOption();
        if (dedupKeyOption == DedupKeyOption.ALL) {
            LOG.info("Using all attributes in the schema (except Map, Arrar and Enum fields) for compaction");
            schema2 = (Schema) AvroUtils.removeUncomparableFields(schema).get();
        } else if (dedupKeyOption == DedupKeyOption.KEY) {
            LOG.info("Using key attributes in the schema for compaction");
            schema2 = (Schema) AvroUtils.removeUncomparableFields(getKeySchema(schema)).get();
        } else if (keySchemaFileSpecified()) {
            Path keySchemaFile = getKeySchemaFile();
            LOG.info("Using attributes specified in schema file " + keySchemaFile + " for compaction");
            try {
                schema2 = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
            } catch (IOException e) {
                LOG.error("Failed to parse avro schema from " + keySchemaFile + ", using key attributes in the schema for compaction");
                schema2 = (Schema) AvroUtils.removeUncomparableFields(getKeySchema(schema)).get();
            }
            if (!isKeySchemaValid(schema2, schema)) {
                LOG.warn(String.format("Key schema %s is not compatible with record schema %s.", schema2, schema) + "Using key attributes in the schema for compaction");
                schema2 = (Schema) AvroUtils.removeUncomparableFields(getKeySchema(schema)).get();
            }
        } else {
            LOG.info("Property compaction.job.avro.key.schema.loc not provided. Using key attributes in the schema for compaction");
            schema2 = (Schema) AvroUtils.removeUncomparableFields(getKeySchema(schema)).get();
        }
        return schema2;
    }

    public static Schema getKeySchema(Schema schema) {
        Preconditions.checkArgument(schema.getType() == Schema.Type.RECORD);
        Optional<Schema> keySchemaFromRecord = getKeySchemaFromRecord(schema);
        if (keySchemaFromRecord.isPresent()) {
            return (Schema) keySchemaFromRecord.get();
        }
        LOG.warn(String.format("No field in the schema of %s is annotated as primarykey. Using all fields for deduping", schema.getName()));
        return schema;
    }

    public static Optional<Schema> getKeySchema(Schema.Field field) {
        switch (AnonymousClass1.$SwitchMap$org$apache$avro$Schema$Type[field.schema().getType().ordinal()]) {
            case 1:
                return getKeySchemaFromRecord(field.schema());
            default:
                return (field.doc() == null || !field.doc().toLowerCase().endsWith(SCHEMA_DEDUP_FIELD_ANNOTATOR)) ? Optional.absent() : Optional.of(field.schema());
        }
    }

    public static Optional<Schema> getKeySchemaFromRecord(Schema schema) {
        Preconditions.checkArgument(schema.getType() == Schema.Type.RECORD);
        ArrayList newArrayList = Lists.newArrayList();
        for (Schema.Field field : schema.getFields()) {
            Optional<Schema> keySchema = getKeySchema(field);
            if (keySchema.isPresent()) {
                newArrayList.add(new Schema.Field(field.name(), (Schema) keySchema.get(), field.doc(), field.defaultValue()));
            }
        }
        if (newArrayList.isEmpty()) {
            return Optional.absent();
        }
        Schema createRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getName(), false);
        createRecord.setFields(newArrayList);
        return Optional.of(createRecord);
    }

    public static boolean isKeySchemaValid(Schema schema, Schema schema2) {
        return SchemaCompatibility.checkReaderWriterCompatibility(schema, schema2).getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
    }

    public static Schema getNewestSchemaFromSource(Job job, FileSystem fileSystem) throws IOException {
        Path[] inputPaths = FileInputFormat.getInputPaths(job);
        ArrayList arrayList = new ArrayList();
        for (Path path : inputPaths) {
            arrayList.addAll(Arrays.asList(fileSystem.listStatus(path)));
        }
        Collections.sort(arrayList, new LastModifiedDescComparator());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Schema newestSchemaFromSource = getNewestSchemaFromSource(((FileStatus) it.next()).getPath(), fileSystem);
            if (newestSchemaFromSource != null) {
                return newestSchemaFromSource;
            }
        }
        return null;
    }

    public static Schema getNewestSchemaFromSource(Path path, FileSystem fileSystem) throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(path);
        Arrays.sort(listStatus, new LastModifiedDescComparator());
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isDirectory()) {
                Schema newestSchemaFromSource = getNewestSchemaFromSource(fileStatus.getPath(), fileSystem);
                if (newestSchemaFromSource != null) {
                    return newestSchemaFromSource;
                }
            } else if (FilenameUtils.isExtension(fileStatus.getPath().getName(), AVRO)) {
                return AvroUtils.getSchemaFromDataFile(fileStatus.getPath(), fileSystem);
            }
        }
        return null;
    }

    private DedupKeyOption getDedupKeyOption() {
        if (!this.dataset.jobProps().contains(COMPACTION_JOB_DEDUP_KEY)) {
            return DEFAULT_DEDUP_KEY_OPTION;
        }
        Optional ifPresent = Enums.getIfPresent(DedupKeyOption.class, this.dataset.jobProps().getProp(COMPACTION_JOB_DEDUP_KEY).toUpperCase());
        return ifPresent.isPresent() ? (DedupKeyOption) ifPresent.get() : DEFAULT_DEDUP_KEY_OPTION;
    }

    private boolean keySchemaFileSpecified() {
        return this.dataset.jobProps().contains(COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
    }

    private Path getKeySchemaFile() {
        return new Path(this.dataset.jobProps().getProp(COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC));
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setInputFormatClass(Job job) {
        job.setInputFormatClass(AvroKeyRecursiveCombineFileInputFormat.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setMapperClass(Job job) {
        job.setMapperClass(AvroKeyMapper.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setMapOutputKeyClass(Job job) {
        job.setMapOutputKeyClass(AvroKey.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setMapOutputValueClass(Job job) {
        job.setMapOutputValueClass(AvroValue.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setOutputFormatClass(Job job) {
        job.setOutputFormatClass(AvroKeyCompactorOutputFormat.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setReducerClass(Job job) {
        job.setReducerClass(AvroKeyDedupReducer.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setOutputKeyClass(Job job) {
        job.setOutputKeyClass(AvroKey.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected void setOutputValueClass(Job job) {
        job.setOutputValueClass(NullWritable.class);
    }

    @Override // org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner
    protected Collection<String> getApplicableFileExtensions() {
        return Lists.newArrayList(new String[]{AVRO});
    }
}
