package org.apache.giraph.master;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.reducers.Reducer;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/master/MasterAggregatorHandler.class */
public class MasterAggregatorHandler implements MasterGlobalCommUsageAggregators, Writable {
    private static final Logger LOG = Logger.getLogger(MasterAggregatorHandler.class);
    private final Map<String, Reducer<Object, Writable>> reducerMap = Maps.newHashMap();
    private final Map<String, Writable> broadcastMap = Maps.newHashMap();
    private final Map<String, Writable> reducedMap = Maps.newHashMap();
    private final AggregatorWriter aggregatorWriter;
    private final Progressable progressable;
    private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;

    public MasterAggregatorHandler(ImmutableClassesGiraphConfiguration<?, ?, ?> immutableClassesGiraphConfiguration, Progressable progressable) {
        this.progressable = progressable;
        this.conf = immutableClassesGiraphConfiguration;
        this.aggregatorWriter = immutableClassesGiraphConfiguration.createAggregatorWriter();
    }

    @Override // org.apache.giraph.master.MasterGlobalCommUsageAggregators
    public final <S, R extends Writable> void registerReducer(String str, ReduceOperation<S, R> reduceOperation) {
        registerReducer(str, reduceOperation, reduceOperation.mo2716createInitialValue());
    }

    @Override // org.apache.giraph.master.MasterGlobalCommUsageAggregators
    public <S, R extends Writable> void registerReducer(String str, ReduceOperation<S, R> reduceOperation, R r) {
        if (this.reducerMap.containsKey(str)) {
            throw new IllegalArgumentException("Reducer with name " + str + " was already registered,  and is " + this.reducerMap.get(str) + ", and we are trying to  register " + reduceOperation);
        }
        if (reduceOperation == null) {
            throw new IllegalArgumentException("null reducer cannot be registered, with name " + str);
        }
        if (r == null) {
            throw new IllegalArgumentException("global initial value for reducer cannot be null, but is for " + reduceOperation + " with naem" + str);
        }
        this.reducerMap.put(str, new Reducer<>(reduceOperation, r));
    }

    @Override // org.apache.giraph.master.MasterGlobalCommUsageAggregators
    public <T extends Writable> T getReduced(String str) {
        T t = (T) this.reducedMap.get(str);
        if (t == null) {
            LOG.warn("getReduced: " + AggregatorUtils.getUnregisteredReducerMessage(str, this.reducedMap.size() != 0, this.conf));
        }
        return t;
    }

    @Override // org.apache.giraph.master.MasterGlobalCommUsageAggregators
    public void broadcast(String str, Writable writable) {
        if (this.broadcastMap.containsKey(str)) {
            throw new IllegalArgumentException("Value already broadcasted for name " + str);
        }
        if (writable == null) {
            throw new IllegalArgumentException("null cannot be broadcasted");
        }
        this.broadcastMap.put(str, writable);
    }

    public void prepareSuperstep() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("prepareSuperstep: Start preparing reducers");
        }
        Preconditions.checkState(this.reducedMap.isEmpty(), "reducedMap must be empty before start of the superstep");
        Preconditions.checkState(this.broadcastMap.isEmpty(), "broadcastMap must be empty before start of the superstep");
        for (Map.Entry<String, Reducer<Object, Writable>> entry : this.reducerMap.entrySet()) {
            Writable currentValue = entry.getValue().getCurrentValue();
            if (currentValue == null) {
                currentValue = entry.getValue().createInitialValue();
            }
            this.reducedMap.put(entry.getKey(), currentValue);
        }
        this.reducerMap.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("prepareSuperstep: Aggregators prepared");
        }
    }

    public void finishSuperstep() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("finishSuperstep: Start finishing aggregators");
        }
        this.reducedMap.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("finishSuperstep: Aggregators finished");
        }
    }

    public void sendDataToOwners(MasterClient masterClient) {
        try {
            for (Map.Entry<String, Reducer<Object, Writable>> entry : this.reducerMap.entrySet()) {
                masterClient.sendToOwner(entry.getKey(), GlobalCommType.REDUCE_OPERATIONS, entry.getValue().getReduceOp());
                this.progressable.progress();
            }
            for (Map.Entry<String, Writable> entry2 : this.broadcastMap.entrySet()) {
                masterClient.sendToOwner(entry2.getKey(), GlobalCommType.BROADCAST, entry2.getValue());
                this.progressable.progress();
            }
            masterClient.finishSendingValues();
            this.broadcastMap.clear();
        } catch (IOException e) {
            throw new IllegalStateException("finishSuperstep: IOException occurred while sending aggregators", e);
        }
    }

    public void acceptReducedValues(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            String readUTF = dataInput.readUTF();
            GlobalCommType globalCommType = GlobalCommType.values()[dataInput.readByte()];
            if (globalCommType != GlobalCommType.REDUCED_VALUE) {
                throw new IllegalStateException("SendReducedToMasterRequest received " + globalCommType);
            }
            Reducer<Object, Writable> reducer = this.reducerMap.get(readUTF);
            if (reducer == null) {
                throw new IllegalStateException("acceptReducedValues: Master received reduced value which isn't registered: " + readUTF);
            }
            Writable createInitialValue = reducer.createInitialValue();
            createInitialValue.readFields(dataInput);
            if (reducer.getCurrentValue() != null) {
                reducer.reduceMerge(createInitialValue);
            } else {
                reducer.setCurrentValue(createInitialValue);
            }
            this.progressable.progress();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("acceptReducedValues: Accepted one set with " + readInt + " aggregated values");
        }
    }

    public void writeAggregators(long j, SuperstepState superstepState) {
        try {
            this.aggregatorWriter.writeAggregator(this.reducedMap.entrySet(), superstepState == SuperstepState.ALL_SUPERSTEPS_DONE ? -1L : j);
        } catch (IOException e) {
            throw new IllegalStateException("coordinateSuperstep: IOException while writing aggregators data", e);
        }
    }

    public void initialize(BspService bspService) {
        try {
            this.aggregatorWriter.initialize(bspService.getContext(), bspService.getApplicationAttempt());
        } catch (IOException e) {
            throw new IllegalStateException("initialize: Couldn't initialize aggregatorWriter", e);
        }
    }

    public void close() throws IOException {
        this.aggregatorWriter.close();
    }

    public void write(DataOutput dataOutput) throws IOException {
        Preconditions.checkState(this.reducedMap.isEmpty(), "reducedMap must be empty at the end of the superstep");
        dataOutput.writeInt(this.reducerMap.size());
        for (Map.Entry<String, Reducer<Object, Writable>> entry : this.reducerMap.entrySet()) {
            dataOutput.writeUTF(entry.getKey());
            entry.getValue().write(dataOutput);
            this.progressable.progress();
        }
        dataOutput.writeInt(this.broadcastMap.size());
        for (Map.Entry<String, Writable> entry2 : this.broadcastMap.entrySet()) {
            dataOutput.writeUTF(entry2.getKey());
            WritableUtils.writeWritableObject(entry2.getValue(), dataOutput);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.reducedMap.clear();
        this.broadcastMap.clear();
        this.reducerMap.clear();
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            String readUTF = dataInput.readUTF();
            Reducer<Object, Writable> reducer = new Reducer<>();
            reducer.readFields(dataInput, this.conf);
            this.reducerMap.put(readUTF, reducer);
        }
        int readInt2 = dataInput.readInt();
        for (int i2 = 0; i2 < readInt2; i2++) {
            this.broadcastMap.put(dataInput.readUTF(), WritableUtils.readWritableObject(dataInput, this.conf));
        }
    }
}
