package datafu.hourglass.jobs;

import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
import datafu.hourglass.avro.AvroMultipleInputsUtil;
import datafu.hourglass.fs.DatePath;
import datafu.hourglass.fs.PathUtils;
import datafu.hourglass.mapreduce.DelegatingCombiner;
import datafu.hourglass.mapreduce.DelegatingMapper;
import datafu.hourglass.mapreduce.DelegatingReducer;
import datafu.hourglass.mapreduce.DistributedCacheHelper;
import datafu.hourglass.mapreduce.ObjectMapper;
import datafu.hourglass.mapreduce.ObjectReducer;
import datafu.hourglass.mapreduce.Parameters;
import datafu.hourglass.mapreduce.PartitioningCombiner;
import datafu.hourglass.mapreduce.PartitioningMapper;
import datafu.hourglass.mapreduce.PartitioningReducer;
import datafu.hourglass.model.Accumulator;
import datafu.hourglass.model.Mapper;
import datafu.hourglass.schemas.PartitionPreservingSchemas;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.Logger;

/* loaded from: input_file:datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.class */
public abstract class AbstractPartitionPreservingIncrementalJob extends IncrementalJob {
    private final Logger _log;
    private List<Report> _reports;
    private PartitioningMapper _mapper;
    private PartitioningCombiner _combiner;
    private PartitioningReducer _reducer;
    private FileCleaner _garbage;

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob$Report.class */
    public static class Report {
        private String jobName;
        private String jobId;
        private Path countersPath;
        private List<DatePath> inputFiles = new ArrayList();
        private List<DatePath> outputFiles = new ArrayList();

        public String getJobName() {
            return this.jobName;
        }

        public String getJobId() {
            return this.jobId;
        }

        public Path getCountersPath() {
            return this.countersPath;
        }

        public List<DatePath> getInputFiles() {
            return Collections.unmodifiableList(this.inputFiles);
        }

        public List<DatePath> getOutputFiles() {
            return Collections.unmodifiableList(this.outputFiles);
        }
    }

    public AbstractPartitionPreservingIncrementalJob() throws IOException {
        this._log = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
        this._reports = new ArrayList();
    }

    public AbstractPartitionPreservingIncrementalJob(String str, Properties properties) throws IOException {
        super(str, properties);
        this._log = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
        this._reports = new ArrayList();
    }

    public abstract Mapper<GenericRecord, GenericRecord, GenericRecord> getMapper();

    public Accumulator<GenericRecord, GenericRecord> getCombinerAccumulator() {
        return null;
    }

    public abstract Accumulator<GenericRecord, GenericRecord> getReducerAccumulator();

    @Override // datafu.hourglass.jobs.AbstractJob
    public void run() throws IOException, InterruptedException, ClassNotFoundException {
        try {
            initialize();
            validate();
            execute();
        } finally {
            cleanup();
        }
    }

    public List<Report> getReports() {
        return Collections.unmodifiableList(this._reports);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datafu.hourglass.jobs.IncrementalJob, datafu.hourglass.jobs.AbstractJob
    public void initialize() {
        this._garbage = new FileCleaner(getFileSystem());
        if (getMaxIterations() == null) {
            setMaxIterations(20);
        }
        if (getMaxToProcess() == null) {
            if (getNumDays() != null) {
                setMaxToProcess(getNumDays());
            } else {
                setMaxToProcess(90);
            }
        }
        super.initialize();
    }

    protected String getOutputSchemaName() {
        return getClass().getSimpleName() + "Output";
    }

    protected String getOutputSchemaNamespace() {
        return getClass().getPackage().getName();
    }

    protected ObjectMapper getMapProcessor() {
        return this._mapper;
    }

    protected ObjectReducer getCombineProcessor() {
        return this._combiner;
    }

    protected ObjectReducer getReduceProcessor() {
        return this._reducer;
    }

    private void execute() throws IOException, InterruptedException, ClassNotFoundException {
        int numReducers;
        int i = 0;
        while (true) {
            PartitionPreservingExecutionPlanner partitionPreservingExecutionPlanner = new PartitionPreservingExecutionPlanner(getFileSystem(), getProperties());
            partitionPreservingExecutionPlanner.setInputPaths(getInputPaths());
            partitionPreservingExecutionPlanner.setOutputPath(getOutputPath());
            partitionPreservingExecutionPlanner.setStartDate(getStartDate());
            partitionPreservingExecutionPlanner.setEndDate(getEndDate());
            partitionPreservingExecutionPlanner.setDaysAgo(getDaysAgo());
            partitionPreservingExecutionPlanner.setNumDays(getNumDays());
            partitionPreservingExecutionPlanner.setMaxToProcess(getMaxToProcess());
            partitionPreservingExecutionPlanner.setFailOnMissing(isFailOnMissing());
            partitionPreservingExecutionPlanner.createPlan();
            if (partitionPreservingExecutionPlanner.getInputsToProcess().size() == 0) {
                this._log.info("Found all necessary incremental data");
                return;
            }
            if (i >= getMaxIterations().intValue()) {
                throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process", Integer.valueOf(i), getMaxIterations(), Integer.valueOf(partitionPreservingExecutionPlanner.getInputsToProcess().size())));
            }
            Path createRandomTempPath = createRandomTempPath();
            this._garbage.add(createRandomTempPath);
            ensurePath(getOutputPath());
            Path ensurePath = ensurePath(new Path(createRandomTempPath, ".incremental-staging"));
            Path ensurePath2 = ensurePath(new Path(createRandomTempPath, ".incremental-staging-tmp"));
            Report report = new Report();
            ArrayList arrayList = new ArrayList();
            for (DatePath datePath : partitionPreservingExecutionPlanner.getInputsToProcess()) {
                arrayList.add(datePath.getPath().toString());
                report.inputFiles.add(datePath);
            }
            this._log.info("Staging path: " + ensurePath);
            StagedOutputJob createStagedJob = StagedOutputJob.createStagedJob(getConf(), getName() + "-incremental", arrayList, ensurePath2.toString(), ensurePath.toString(), this._log);
            createStagedJob.setCountersParentPath(getCountersParentPath());
            Configuration configuration = createStagedJob.getConfiguration();
            config(configuration);
            PartitionPreservingSchemas partitionPreservingSchemas = new PartitionPreservingSchemas(getSchemas(), partitionPreservingExecutionPlanner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
            createStagedJob.setInputFormatClass(AvroMultipleInputsKeyInputFormat.class);
            createStagedJob.setOutputFormatClass(AvroKeyOutputFormat.class);
            this._log.info("Setting input path to schema mappings");
            for (String str : partitionPreservingSchemas.getMapInputSchemas().keySet()) {
                Schema schema = partitionPreservingSchemas.getMapInputSchemas().get(str);
                this._log.info("*** " + str);
                this._log.info("*** => " + schema.toString());
                AvroMultipleInputsUtil.setInputKeySchemaForPath(createStagedJob, schema, str);
            }
            AvroJob.setMapOutputKeySchema(createStagedJob, partitionPreservingSchemas.getMapOutputKeySchema());
            AvroJob.setMapOutputValueSchema(createStagedJob, partitionPreservingSchemas.getMapOutputValueSchema());
            AvroJob.setOutputKeySchema(createStagedJob, partitionPreservingSchemas.getReduceOutputSchema());
            StringBuilder sb = new StringBuilder();
            for (Date date : partitionPreservingExecutionPlanner.getDatesToProcess()) {
                String format = PathUtils.datedPathFormat.format(date);
                this._log.info(String.format("Adding named output %s", format));
                AvroMultipleOutputs.addNamedOutput(createStagedJob, format, AvroKeyOutputFormat.class, partitionPreservingSchemas.getReduceOutputSchema());
                sb.append(Long.toString(date.getTime()));
                sb.append(",");
            }
            if (getNumReducers() != null) {
                numReducers = getNumReducers().intValue();
                this._log.info(String.format("Using %d reducers (fixed)", Integer.valueOf(numReducers)));
            } else {
                numReducers = partitionPreservingExecutionPlanner.getNumReducers();
                this._log.info(String.format("Using %d reducers (computed)", Integer.valueOf(numReducers)));
            }
            int ceil = (int) Math.ceil(numReducers / partitionPreservingExecutionPlanner.getDatesToProcess().size());
            this._log.info(String.format("Reducers per input path: %d", Integer.valueOf(ceil)));
            configuration.set(TimePartitioner.REDUCERS_PER_INPUT, Integer.toString(ceil));
            configuration.set(TimePartitioner.INPUT_TIMES, sb.substring(0, sb.length() - 1));
            createStagedJob.setNumReduceTasks(numReducers);
            Path path = new Path(ensurePath, ".mapper_impl");
            Path path2 = new Path(ensurePath, ".reducer_impl");
            Path path3 = new Path(ensurePath, ".combiner_impl");
            configuration.set(Parameters.REDUCER_IMPL_PATH, path2.toString());
            configuration.set(Parameters.MAPPER_IMPL_PATH, path.toString());
            this._mapper = new PartitioningMapper();
            this._mapper.setSchemas(partitionPreservingSchemas);
            this._mapper.setMapper(getMapper());
            this._reducer = new PartitioningReducer();
            this._reducer.setSchemas(partitionPreservingSchemas);
            this._reducer.setAccumulator(getReducerAccumulator());
            DistributedCacheHelper.writeObject(configuration, getMapProcessor(), path);
            DistributedCacheHelper.writeObject(configuration, getReduceProcessor(), path2);
            createStagedJob.setMapperClass(DelegatingMapper.class);
            createStagedJob.setReducerClass(DelegatingReducer.class);
            if (isUseCombiner()) {
                this._combiner = new PartitioningCombiner();
                this._combiner.setAccumulator(getCombinerAccumulator());
                configuration.set(Parameters.COMBINER_IMPL_PATH, path3.toString());
                createStagedJob.setCombinerClass(DelegatingCombiner.class);
                DistributedCacheHelper.writeObject(configuration, getCombineProcessor(), path3);
            }
            createStagedJob.setPartitionerClass(TimePartitioner.class);
            if (!createStagedJob.waitForCompletion(true)) {
                this._log.error("Job failed! Quitting...");
                throw new RuntimeException("Job failed");
            }
            report.jobName = createStagedJob.getJobName();
            report.jobId = createStagedJob.getJobID().toString();
            moveStagedFiles(report, ensurePath);
            if (getCountersParentPath() == null && createStagedJob.getCountersPath() != null) {
                Path countersPath = createStagedJob.getCountersPath();
                if (getFileSystem().exists(countersPath)) {
                    Path path4 = new Path(getOutputPath(), countersPath.getName());
                    if (getFileSystem().exists(path4)) {
                        this._log.info(String.format("Removing old counters at %s", path4));
                        getFileSystem().delete(path4, true);
                    }
                    this._log.info(String.format("Moving %s to %s", countersPath.getName(), getOutputPath()));
                    getFileSystem().rename(countersPath, path4);
                    report.countersPath = path4;
                } else {
                    this._log.error("Could not find counters at " + countersPath);
                }
            }
            applyRetention();
            this._reports.add(report);
            if (!partitionPreservingExecutionPlanner.getNeedsAnotherPass()) {
                return;
            }
            cleanup();
            i++;
        }
    }

    private void cleanup() throws IOException {
        if (this._garbage != null) {
            this._garbage.clean();
        }
    }

    private void applyRetention() throws IOException {
        if (getRetentionCount() != null) {
            PathUtils.keepLatestNestedDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount().intValue());
        }
    }

    private void moveStagedFiles(Report report, Path path) throws IOException {
        this._log.info("Following files produced in staging path:");
        for (FileStatus fileStatus : getFileSystem().globStatus(new Path(path, "*.avro"))) {
            this._log.info(String.format("* %s (%d bytes)", fileStatus.getPath(), Long.valueOf(fileStatus.getLen())));
        }
        FileStatus[] globStatus = getFileSystem().globStatus(new Path(path, "*"), new PathFilter() { // from class: datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob.1
            public boolean accept(Path path2) {
                try {
                    Long.parseLong(path2.getName().split("-")[0]);
                    return true;
                } catch (NumberFormatException e) {
                    return false;
                }
            }
        });
        HashMap hashMap = new HashMap();
        for (FileStatus fileStatus2 : globStatus) {
            try {
                String str = fileStatus2.getPath().getName().split("-")[0];
                if (!hashMap.containsKey(str)) {
                    Path path2 = new Path(path, str);
                    if (getFileSystem().exists(path2)) {
                        throw new RuntimeException("already exists: " + path2.toString());
                    }
                    getFileSystem().mkdirs(path2);
                    hashMap.put(str, path2);
                }
                Path path3 = (Path) hashMap.get(str);
                this._log.info(String.format("Moving %s to %s", fileStatus2.getPath().getName(), path3.toString()));
                getFileSystem().rename(fileStatus2.getPath(), new Path(path3, fileStatus2.getPath().getName()));
            } catch (NumberFormatException e) {
                throw new RuntimeException(e);
            }
        }
        for (Path path4 : hashMap.values()) {
            try {
                Date parse = PathUtils.datedPathFormat.parse(path4.getName());
                Path path5 = new Path(getOutputPath(), PathUtils.nestedDatedPathFormat.format(parse));
                this._log.info(String.format("Moving %s to %s", path4.getName(), path5));
                getFileSystem().mkdirs(path5.getParent());
                if (!getFileSystem().rename(path4, path5)) {
                    throw new RuntimeException("Failed to rename " + path4 + " to " + path5);
                }
                report.outputFiles.add(new DatePath(parse, path5));
            } catch (ParseException e2) {
                throw new RuntimeException(e2);
            }
        }
    }
}
