package datafu.hourglass.jobs;

import datafu.hourglass.avro.AvroDateRangeMetadata;
import datafu.hourglass.avro.AvroKeyWithMetadataOutputFormat;
import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
import datafu.hourglass.avro.AvroMultipleInputsUtil;
import datafu.hourglass.fs.DatePath;
import datafu.hourglass.fs.DateRange;
import datafu.hourglass.fs.PathUtils;
import datafu.hourglass.mapreduce.AvroKeyValueIdentityMapper;
import datafu.hourglass.mapreduce.CollapsingCombiner;
import datafu.hourglass.mapreduce.CollapsingMapper;
import datafu.hourglass.mapreduce.CollapsingReducer;
import datafu.hourglass.mapreduce.DelegatingCombiner;
import datafu.hourglass.mapreduce.DelegatingMapper;
import datafu.hourglass.mapreduce.DelegatingReducer;
import datafu.hourglass.mapreduce.DistributedCacheHelper;
import datafu.hourglass.mapreduce.Parameters;
import datafu.hourglass.model.Accumulator;
import datafu.hourglass.model.Mapper;
import datafu.hourglass.model.Merger;
import datafu.hourglass.schemas.PartitionCollapsingSchemas;
import java.io.IOException;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.log4j.Logger;

/* loaded from: input_file:datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.class */
public abstract class AbstractPartitionCollapsingIncrementalJob extends IncrementalJob {
    private final Logger _log;
    private List<Report> _reports;
    protected boolean _reusePreviousOutput;
    private FileCleaner _garbage;

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob$Report.class */
    public static class Report {
        private String jobName;
        private String jobId;
        private Path countersPath;
        private DatePath outputPath;
        private List<DatePath> inputFiles = new ArrayList();
        private List<DatePath> oldInputFiles = new ArrayList();
        private DatePath reusedOutput;

        public String getJobName() {
            return this.jobName;
        }

        public String getJobId() {
            return this.jobId;
        }

        public Path getCountersPath() {
            return this.countersPath;
        }

        public DatePath getOutputPath() {
            return this.outputPath;
        }

        public DatePath getReusedOutput() {
            return this.reusedOutput;
        }

        public List<DatePath> getInputFiles() {
            return Collections.unmodifiableList(this.inputFiles);
        }

        public List<DatePath> getOldInputFiles() {
            return Collections.unmodifiableList(this.oldInputFiles);
        }
    }

    public AbstractPartitionCollapsingIncrementalJob() throws IOException {
        this._log = Logger.getLogger(AbstractPartitionCollapsingIncrementalJob.class);
        this._reports = new ArrayList();
    }

    public AbstractPartitionCollapsingIncrementalJob(String str, Properties properties) throws IOException {
        super(str, properties);
        this._log = Logger.getLogger(AbstractPartitionCollapsingIncrementalJob.class);
        this._reports = new ArrayList();
    }

    public abstract Mapper<GenericRecord, GenericRecord, GenericRecord> getMapper();

    public Accumulator<GenericRecord, GenericRecord> getCombinerAccumulator() {
        return null;
    }

    public abstract Accumulator<GenericRecord, GenericRecord> getReducerAccumulator();

    public Merger<GenericRecord> getRecordMerger() {
        return null;
    }

    public Merger<GenericRecord> getOldRecordMerger() {
        return null;
    }

    protected String getOutputSchemaName() {
        return getClass().getSimpleName() + "Output";
    }

    protected String getOutputSchemaNamespace() {
        return getClass().getPackage().getName();
    }

    @Override // datafu.hourglass.jobs.IncrementalJob, datafu.hourglass.jobs.TimeBasedJob, datafu.hourglass.jobs.AbstractJob
    public void setProperties(Properties properties) {
        super.setProperties(properties);
        if (getProperties().get("reuse.previous.output") != null) {
            setReusePreviousOutput(Boolean.parseBoolean((String) getProperties().get("reuse.previous.output")));
        }
    }

    public boolean getReusePreviousOutput() {
        return this._reusePreviousOutput;
    }

    public void setReusePreviousOutput(boolean z) {
        this._reusePreviousOutput = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datafu.hourglass.jobs.IncrementalJob, datafu.hourglass.jobs.AbstractJob
    public void initialize() {
        this._garbage = new FileCleaner(getFileSystem());
        if (getMaxIterations() == null) {
            setMaxIterations(20);
        }
        if (getMaxToProcess() == null) {
            if (getNumDays() != null) {
                setMaxToProcess(getNumDays());
            } else {
                setMaxToProcess(90);
            }
        }
        if (getRetentionCount() == null) {
            setRetentionCount(1);
        }
        super.initialize();
    }

    @Override // datafu.hourglass.jobs.AbstractJob
    public void run() throws IOException, InterruptedException, ClassNotFoundException {
        try {
            initialize();
            validate();
            execute();
        } finally {
            cleanup();
        }
    }

    public List<Report> getReports() {
        return Collections.unmodifiableList(this._reports);
    }

    private void execute() throws IOException, InterruptedException, ClassNotFoundException {
        int numReducers;
        int i = 0;
        while (true) {
            PartitionCollapsingExecutionPlanner partitionCollapsingExecutionPlanner = new PartitionCollapsingExecutionPlanner(getFileSystem(), getProperties());
            partitionCollapsingExecutionPlanner.setInputPaths(getInputPaths());
            partitionCollapsingExecutionPlanner.setOutputPath(getOutputPath());
            partitionCollapsingExecutionPlanner.setStartDate(getStartDate());
            partitionCollapsingExecutionPlanner.setEndDate(getEndDate());
            partitionCollapsingExecutionPlanner.setDaysAgo(getDaysAgo());
            partitionCollapsingExecutionPlanner.setNumDays(getNumDays());
            partitionCollapsingExecutionPlanner.setMaxToProcess(getMaxToProcess());
            partitionCollapsingExecutionPlanner.setReusePreviousOutput(getReusePreviousOutput());
            partitionCollapsingExecutionPlanner.setFailOnMissing(isFailOnMissing());
            partitionCollapsingExecutionPlanner.createPlan();
            if (partitionCollapsingExecutionPlanner.getInputsToProcess().size() == 0) {
                this._log.info("Nothing to do");
                return;
            }
            if (i >= getMaxIterations().intValue()) {
                throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process", Integer.valueOf(i), getMaxIterations(), Integer.valueOf(partitionCollapsingExecutionPlanner.getInputsToProcess().size())));
            }
            Report report = new Report();
            report.inputFiles.addAll(partitionCollapsingExecutionPlanner.getNewInputsToProcess());
            report.oldInputFiles.addAll(partitionCollapsingExecutionPlanner.getOldInputsToProcess());
            if (partitionCollapsingExecutionPlanner.getPreviousOutputToProcess() != null) {
                report.reusedOutput = partitionCollapsingExecutionPlanner.getPreviousOutputToProcess();
            }
            DatePath createDatedPath = DatePath.createDatedPath(getOutputPath(), partitionCollapsingExecutionPlanner.getCurrentDateRange().getEndDate());
            this._log.info("Output path: " + createDatedPath);
            Path createRandomTempPath = createRandomTempPath();
            this._garbage.add(createRandomTempPath);
            StagedOutputJob createStagedJob = StagedOutputJob.createStagedJob(getConf(), getName() + "-" + PathUtils.datedPathFormat.format(partitionCollapsingExecutionPlanner.getCurrentDateRange().getEndDate()), null, createRandomTempPath.toString(), createDatedPath.getPath().toString(), this._log);
            createStagedJob.setCountersParentPath(getCountersParentPath());
            if (partitionCollapsingExecutionPlanner.getNewInputsToProcess() != null && partitionCollapsingExecutionPlanner.getNewInputsToProcess().size() > 0) {
                this._log.info("*** New Input data:");
                for (DatePath datePath : partitionCollapsingExecutionPlanner.getNewInputsToProcess()) {
                    this._log.info(datePath.getPath());
                    MultipleInputs.addInputPath(createStagedJob, datePath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
                }
            }
            if (partitionCollapsingExecutionPlanner.getOldInputsToProcess() != null && partitionCollapsingExecutionPlanner.getOldInputsToProcess().size() > 0) {
                this._log.info("*** Old Input data:");
                for (DatePath datePath2 : partitionCollapsingExecutionPlanner.getOldInputsToProcess()) {
                    this._log.info(datePath2.getPath());
                    MultipleInputs.addInputPath(createStagedJob, datePath2.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
                }
            }
            if (partitionCollapsingExecutionPlanner.getPreviousOutputToProcess() != null) {
                this._log.info("*** Previous output data:");
                this._log.info(partitionCollapsingExecutionPlanner.getPreviousOutputToProcess().getPath());
                MultipleInputs.addInputPath(createStagedJob, partitionCollapsingExecutionPlanner.getPreviousOutputToProcess().getPath(), AvroKeyInputFormat.class, AvroKeyValueIdentityMapper.class);
            }
            Configuration configuration = createStagedJob.getConfiguration();
            config(configuration);
            AvroDateRangeMetadata.configureOutputDateRange(configuration, partitionCollapsingExecutionPlanner.getCurrentDateRange());
            PartitionCollapsingSchemas partitionCollapsingSchemas = new PartitionCollapsingSchemas(getSchemas(), partitionCollapsingExecutionPlanner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
            createStagedJob.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);
            this._log.info("Setting input path to schema mappings");
            for (String str : partitionCollapsingSchemas.getMapInputSchemas().keySet()) {
                Schema schema = partitionCollapsingSchemas.getMapInputSchemas().get(str);
                this._log.info("*** " + str);
                this._log.info("*** => " + schema.toString());
                AvroMultipleInputsUtil.setInputKeySchemaForPath(createStagedJob, schema, str);
            }
            AvroJob.setMapOutputKeySchema(createStagedJob, partitionCollapsingSchemas.getMapOutputKeySchema());
            AvroJob.setMapOutputValueSchema(createStagedJob, partitionCollapsingSchemas.getMapOutputValueSchema());
            AvroJob.setOutputKeySchema(createStagedJob, partitionCollapsingSchemas.getReduceOutputSchema());
            if (getNumReducers() != null) {
                numReducers = getNumReducers().intValue();
                this._log.info(String.format("Using %d reducers (fixed)", Integer.valueOf(numReducers)));
            } else {
                numReducers = partitionCollapsingExecutionPlanner.getNumReducers();
                this._log.info(String.format("Using %d reducers (computed)", Integer.valueOf(numReducers)));
            }
            createStagedJob.setNumReduceTasks(numReducers);
            createStagedJob.setReducerClass(DelegatingReducer.class);
            Path path = new Path(createRandomTempPath, ".mapper_impl");
            Path path2 = new Path(createRandomTempPath, ".reducer_impl");
            Path path3 = new Path(createRandomTempPath, ".combiner_impl");
            CollapsingMapper collapsingMapper = new CollapsingMapper();
            CollapsingReducer collapsingReducer = new CollapsingReducer();
            collapsingMapper.setSchemas(partitionCollapsingSchemas);
            collapsingReducer.setSchemas(partitionCollapsingSchemas);
            collapsingMapper.setMapper(getMapper());
            collapsingReducer.setAccumulator(getReducerAccumulator());
            collapsingReducer.setRecordMerger(getRecordMerger());
            collapsingReducer.setOldRecordMerger(getOldRecordMerger());
            collapsingMapper.setReuseOutput(this._reusePreviousOutput);
            collapsingReducer.setReuseOutput(this._reusePreviousOutput);
            configureOutputDateRange(createStagedJob.getConfiguration(), partitionCollapsingExecutionPlanner.getCurrentDateRange(), collapsingReducer);
            DistributedCacheHelper.writeObject(configuration, collapsingMapper, path);
            DistributedCacheHelper.writeObject(configuration, collapsingReducer, path2);
            configuration.set(Parameters.REDUCER_IMPL_PATH, path2.toString());
            configuration.set(Parameters.MAPPER_IMPL_PATH, path.toString());
            if (isUseCombiner()) {
                CollapsingCombiner collapsingCombiner = new CollapsingCombiner();
                configureOutputDateRange(createStagedJob.getConfiguration(), partitionCollapsingExecutionPlanner.getCurrentDateRange(), collapsingCombiner);
                collapsingCombiner.setReuseOutput(this._reusePreviousOutput);
                collapsingCombiner.setSchemas(partitionCollapsingSchemas);
                collapsingCombiner.setAccumulator(getCombinerAccumulator());
                configuration.set(Parameters.COMBINER_IMPL_PATH, path3.toString());
                createStagedJob.setCombinerClass(DelegatingCombiner.class);
                DistributedCacheHelper.writeObject(configuration, collapsingCombiner, path3);
            }
            if (!createStagedJob.waitForCompletion(true)) {
                this._log.error("Job failed! Quitting...");
                throw new RuntimeException("Job failed");
            }
            report.jobId = createStagedJob.getJobID().toString();
            report.jobName = createStagedJob.getJobName();
            report.countersPath = createStagedJob.getCountersPath();
            report.outputPath = createDatedPath;
            this._reports.add(report);
            applyRetention();
            if (!partitionCollapsingExecutionPlanner.getNeedsAnotherPass()) {
                return;
            }
            cleanup();
            i++;
        }
    }

    private void applyRetention() throws IOException {
        if (getRetentionCount() != null) {
            PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount().intValue());
        }
    }

    private static void configureOutputDateRange(Configuration configuration, DateRange dateRange, DateRangeConfigurable dateRangeConfigurable) {
        Calendar calendar = Calendar.getInstance(PathUtils.timeZone);
        long j = 0;
        if (dateRange.getBeginDate() != null) {
            calendar.setTime(dateRange.getBeginDate());
            j = calendar.getTimeInMillis();
        }
        if (dateRange.getEndDate() != null) {
            calendar.setTime(dateRange.getEndDate());
            calendar.getTimeInMillis();
        }
        dateRangeConfigurable.setOutputDateRange(new DateRange(new Date(j), new Date(Long.MAX_VALUE)));
    }

    private void cleanup() throws IOException {
        if (this._garbage != null) {
            this._garbage.clean();
        }
    }
}
