package datafu.hourglass.mapreduce;

import datafu.hourglass.fs.DateRange;
import datafu.hourglass.jobs.DateRangeConfigurable;
import datafu.hourglass.model.Accumulator;
import datafu.hourglass.model.Merger;
import datafu.hourglass.schemas.PartitionCollapsingSchemas;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.mapreduce.ReduceContext;

/* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingReducer.class */
public class CollapsingReducer extends ObjectReducer implements DateRangeConfigurable, Serializable {
    protected long _beginTime;
    protected long _endTime;
    private Accumulator<GenericRecord, GenericRecord> _newAccumulator;
    private Accumulator<GenericRecord, GenericRecord> _oldAccumulator;
    private Merger<GenericRecord> _merger;
    private Merger<GenericRecord> _oldMerger;
    private boolean _reusePreviousOutput;
    private PartitionCollapsingSchemas _schemas;

    @Override // datafu.hourglass.mapreduce.ObjectReducer
    public void reduce(Object obj, Iterable<Object> iterable, ReduceContext<Object, Object, Object, Object> reduceContext) throws IOException, InterruptedException {
        GenericData.Record record;
        if (this._newAccumulator == null) {
            throw new RuntimeException("No reducer set");
        }
        GenericRecord genericRecord = (GenericRecord) ((AvroKey) obj).datum();
        Accumulator<GenericRecord, GenericRecord> newAccumulator = getNewAccumulator();
        newAccumulator.cleanup();
        long j = 0;
        Accumulator<GenericRecord, GenericRecord> accumulator = null;
        long j2 = 0;
        if (getReuseOutput()) {
            accumulator = getOldAccumulator();
            accumulator.cleanup();
        }
        GenericData.Record record2 = null;
        Iterator<Object> it = iterable.iterator();
        while (it.hasNext()) {
            GenericData.Record record3 = (GenericRecord) ((AvroValue) it.next()).datum();
            if (record3.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName())) {
                newAccumulator.accumulate(record3);
                j++;
            } else if (record3.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName())) {
                if (!this._reusePreviousOutput) {
                    throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
                }
                Long l = (Long) record3.get("time");
                GenericData.Record record4 = (GenericData.Record) record3.get("value");
                if (l == null) {
                    throw new RuntimeException("time is null");
                }
                if (record4 == null) {
                    throw new RuntimeException("value is null");
                }
                if (l.longValue() >= this._beginTime && l.longValue() <= this._endTime) {
                    newAccumulator.accumulate(record4);
                    j++;
                } else {
                    if (l.longValue() >= this._beginTime) {
                        throw new RuntimeException(String.format("Time %d is greater than end time %d", l, Long.valueOf(this._endTime)));
                    }
                    accumulator.accumulate(record4);
                    j2++;
                }
            } else {
                if (!record3.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName())) {
                    throw new RuntimeException("Unexpected type: " + record3.getSchema().getFullName());
                }
                if (!this._reusePreviousOutput) {
                    throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
                }
                record2 = new GenericData.Record(record3, true);
            }
        }
        GenericData.Record record5 = null;
        GenericRecord genericRecord2 = null;
        if (j > 0) {
            record5 = (GenericRecord) newAccumulator.getFinal();
        }
        if (j2 > 0) {
            genericRecord2 = accumulator.getFinal();
        }
        if (record2 == null) {
            record = record5;
            if (genericRecord2 != null) {
                if (this._oldMerger == null) {
                    throw new RuntimeException("No old record merger set");
                }
                record = (GenericRecord) this._oldMerger.merge(record, genericRecord2);
            }
        } else {
            record = record2;
            if (genericRecord2 != null) {
                if (this._oldMerger == null) {
                    throw new RuntimeException("No old record merger set");
                }
                record = (GenericRecord) this._oldMerger.merge(record, genericRecord2);
            }
            if (record5 != null) {
                if (this._merger == null) {
                    throw new RuntimeException("No new record merger set");
                }
                record = (GenericRecord) this._merger.merge(record, record5);
            }
        }
        if (record != null) {
            GenericData.Record record6 = new GenericData.Record(getSchemas().getReduceOutputSchema());
            record6.put("key", genericRecord);
            record6.put("value", record);
            reduceContext.write(new AvroKey(record6), (Object) null);
        }
    }

    public void setSchemas(PartitionCollapsingSchemas partitionCollapsingSchemas) {
        this._schemas = partitionCollapsingSchemas;
    }

    private PartitionCollapsingSchemas getSchemas() {
        return this._schemas;
    }

    public boolean getReuseOutput() {
        return this._reusePreviousOutput;
    }

    public void setReuseOutput(boolean z) {
        this._reusePreviousOutput = z;
    }

    public void setAccumulator(Accumulator<GenericRecord, GenericRecord> accumulator) {
        this._newAccumulator = cloneAccumulator(accumulator);
        this._oldAccumulator = cloneAccumulator(accumulator);
    }

    public Accumulator<GenericRecord, GenericRecord> getNewAccumulator() {
        return this._newAccumulator;
    }

    public Accumulator<GenericRecord, GenericRecord> getOldAccumulator() {
        return this._oldAccumulator;
    }

    public void setRecordMerger(Merger<GenericRecord> merger) {
        this._merger = merger;
    }

    public void setOldRecordMerger(Merger<GenericRecord> merger) {
        this._oldMerger = merger;
    }

    @Override // datafu.hourglass.jobs.DateRangeConfigurable
    public void setOutputDateRange(DateRange dateRange) {
        this._beginTime = dateRange.getBeginDate().getTime();
        this._endTime = dateRange.getEndDate().getTime();
    }

    private Accumulator<GenericRecord, GenericRecord> cloneAccumulator(Accumulator<GenericRecord, GenericRecord> accumulator) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(accumulator);
            objectOutputStream.close();
            byteArrayOutputStream.close();
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
            ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
            Accumulator<GenericRecord, GenericRecord> accumulator2 = (Accumulator) objectInputStream.readObject();
            objectInputStream.close();
            byteArrayInputStream.close();
            return accumulator2;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e2) {
            throw new RuntimeException(e2);
        }
    }
}
