package datafu.hourglass.mapreduce;

import datafu.hourglass.fs.DateRange;
import datafu.hourglass.jobs.DateRangeConfigurable;
import datafu.hourglass.model.Accumulator;
import datafu.hourglass.schemas.PartitionCollapsingSchemas;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.mapreduce.ReduceContext;

/* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingCombiner.class */
public class CollapsingCombiner extends ObjectReducer implements DateRangeConfigurable, Serializable {
    private Accumulator<GenericRecord, GenericRecord> _accumulator;
    private boolean _reusePreviousOutput;
    private PartitionCollapsingSchemas _schemas;
    private long _beginTime;
    private long _endTime;

    @Override // datafu.hourglass.mapreduce.ObjectReducer
    public void reduce(Object obj, Iterable<Object> iterable, ReduceContext<Object, Object, Object, Object> reduceContext) throws IOException, InterruptedException {
        GenericRecord genericRecord;
        Accumulator<GenericRecord, GenericRecord> accumulator = getAccumulator();
        if (accumulator == null) {
            throw new RuntimeException("No combiner factory set");
        }
        long j = 0;
        accumulator.cleanup();
        Iterator<Object> it = iterable.iterator();
        while (it.hasNext()) {
            GenericRecord genericRecord2 = (GenericRecord) ((AvroValue) it.next()).datum();
            if (genericRecord2.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName())) {
                accumulator.accumulate(genericRecord2);
                j++;
            } else if (genericRecord2.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName())) {
                if (!this._reusePreviousOutput) {
                    throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
                }
                Long l = (Long) genericRecord2.get("time");
                GenericData.Record record = (GenericData.Record) genericRecord2.get("value");
                if (l == null) {
                    throw new RuntimeException("time is null");
                }
                if (record == null) {
                    throw new RuntimeException("value is null");
                }
                if (l.longValue() >= this._beginTime && l.longValue() <= this._endTime) {
                    accumulator.accumulate(record);
                    j++;
                } else {
                    if (l.longValue() >= this._beginTime) {
                        throw new RuntimeException(String.format("Time %d is greater than end time %d", l, Long.valueOf(this._endTime)));
                    }
                    reduceContext.write((AvroKey) obj, new AvroValue(genericRecord2));
                }
            } else {
                if (!genericRecord2.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName())) {
                    throw new RuntimeException("Unexpected type: " + genericRecord2.getSchema().getFullName());
                }
                if (!this._reusePreviousOutput) {
                    throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName());
                }
                reduceContext.write((AvroKey) obj, new AvroValue(genericRecord2));
            }
        }
        if (j <= 0 || (genericRecord = accumulator.getFinal()) == null) {
            return;
        }
        reduceContext.write((AvroKey) obj, new AvroValue(genericRecord));
    }

    public void setSchemas(PartitionCollapsingSchemas partitionCollapsingSchemas) {
        this._schemas = partitionCollapsingSchemas;
    }

    public PartitionCollapsingSchemas getSchemas() {
        return this._schemas;
    }

    public boolean getReuseOutput() {
        return this._reusePreviousOutput;
    }

    public void setReuseOutput(boolean z) {
        this._reusePreviousOutput = z;
    }

    public Accumulator<GenericRecord, GenericRecord> getAccumulator() {
        return this._accumulator;
    }

    public void setAccumulator(Accumulator<GenericRecord, GenericRecord> accumulator) {
        this._accumulator = cloneAccumulator(accumulator);
    }

    @Override // datafu.hourglass.jobs.DateRangeConfigurable
    public void setOutputDateRange(DateRange dateRange) {
        this._beginTime = dateRange.getBeginDate().getTime();
        this._endTime = dateRange.getEndDate().getTime();
    }

    private Accumulator<GenericRecord, GenericRecord> cloneAccumulator(Accumulator<GenericRecord, GenericRecord> accumulator) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(accumulator);
            objectOutputStream.close();
            byteArrayOutputStream.close();
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
            ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
            Accumulator<GenericRecord, GenericRecord> accumulator2 = (Accumulator) objectInputStream.readObject();
            objectInputStream.close();
            byteArrayInputStream.close();
            return accumulator2;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e2) {
            throw new RuntimeException(e2);
        }
    }
}
