package org.apache.flink.hadoopcompatibility.mapred;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/* loaded from: input_file:org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.class */
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
    private static final long serialVersionUID = 1;
    private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
    private transient JobConf jobConf;
    private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
    private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
    private transient Reporter reporter;

    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer) {
        this(reducer, new JobConf());
    }

    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer, JobConf jobConf) {
        if (reducer == null) {
            throw new NullPointerException("Reducer may not be null.");
        }
        if (jobConf == null) {
            throw new NullPointerException("JobConf may not be null.");
        }
        this.reducer = reducer;
        this.jobConf = jobConf;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.reducer.configure(this.jobConf);
        this.reporter = new HadoopDummyReporter();
        this.reduceCollector = new HadoopOutputCollector<>();
        this.valueIterator = new HadoopTupleUnwrappingIterator<>(TypeExtractor.getForClass((Class) TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 0)).createSerializer(getRuntimeContext().getExecutionConfig()));
    }

    public void reduce(Iterable<Tuple2<KEYIN, VALUEIN>> iterable, Collector<Tuple2<KEYOUT, VALUEOUT>> collector) throws Exception {
        this.reduceCollector.setFlinkCollector(collector);
        this.valueIterator.set(iterable.iterator());
        this.reducer.reduce(this.valueIterator.getCurrentKey(), this.valueIterator, this.reduceCollector, this.reporter);
    }

    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
        return new TupleTypeInfo(new TypeInformation[]{TypeExtractor.getForClass((Class) TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 2)), TypeExtractor.getForClass((Class) TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 3))});
    }

    private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
        objectOutputStream.writeObject(this.reducer.getClass());
        this.jobConf.write(objectOutputStream);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        this.reducer = (Reducer) InstantiationUtil.instantiate((Class) objectInputStream.readObject());
        this.jobConf = new JobConf();
        this.jobConf.readFields(objectInputStream);
    }
}
