package org.apache.flink.ml.common.broadcast.operator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.iteration.operator.OperatorStateUtils;
import org.apache.flink.ml.common.broadcast.BroadcastContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/ml/common/broadcast/operator/BroadcastVariableReceiverOperator.class */
public class BroadcastVariableReceiverOperator<OUT> extends AbstractStreamOperatorV2<OUT> implements MultipleInputStreamOperator<OUT>, BoundedMultiInput, Serializable {
    private final String[] broadcastStreamNames;
    private final TypeInformation<?>[] inTypes;
    private final List<Input> inputList;
    private boolean[] cachesReady;
    private ListState[] cacheStates;
    private ListState<Boolean>[] cacheReadyStates;

    /* loaded from: input_file:org/apache/flink/ml/common/broadcast/operator/BroadcastVariableReceiverOperator$ProxyInput.class */
    private class ProxyInput<IN, OT> extends AbstractInput<IN, OT> {
        public ProxyInput(AbstractStreamOperatorV2<OT> abstractStreamOperatorV2, int i) {
            super(abstractStreamOperatorV2, i);
        }

        public void processElement(StreamRecord<IN> streamRecord) throws Exception {
            BroadcastVariableReceiverOperator.this.cacheStates[this.inputId - 1].add(streamRecord.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BroadcastVariableReceiverOperator(StreamOperatorParameters<OUT> streamOperatorParameters, String[] strArr, TypeInformation<?>[] typeInformationArr) {
        super(streamOperatorParameters, strArr.length);
        this.broadcastStreamNames = strArr;
        this.inTypes = typeInformationArr;
        this.inputList = new ArrayList();
        for (int i = 0; i < typeInformationArr.length; i++) {
            this.inputList.add(new ProxyInput(this, i + 1));
        }
        this.cachesReady = new boolean[typeInformationArr.length];
        this.cacheStates = new ListState[typeInformationArr.length];
        this.cacheReadyStates = new ListState[typeInformationArr.length];
    }

    public List<Input> getInputs() {
        return this.inputList;
    }

    public void endInput(int i) throws Exception {
        this.cachesReady[i - 1] = true;
        String str = this.broadcastStreamNames[i - 1] + "-" + getRuntimeContext().getIndexOfThisSubtask();
        BroadcastContext.putBroadcastVariable(str, Tuple2.of(true, IteratorUtils.toList(((Iterable) this.cacheStates[i - 1].get()).iterator())));
        BroadcastContext.notifyCacheFinished(str);
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        for (int i = 0; i < this.inTypes.length; i++) {
            this.cacheReadyStates[i].clear();
            this.cacheReadyStates[i].add(Boolean.valueOf(this.cachesReady[i]));
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        for (int i = 0; i < this.inTypes.length; i++) {
            this.cacheStates[i] = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("cache_data_" + i, this.inTypes[i]));
            this.cacheReadyStates[i] = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("cache_ready_state_" + i, BasicTypeInfo.BOOLEAN_TYPE_INFO));
            BroadcastContext.putBroadcastVariable(this.broadcastStreamNames[i] + "-" + getRuntimeContext().getIndexOfThisSubtask(), Tuple2.of(Boolean.valueOf(((Boolean) OperatorStateUtils.getUniqueElement(this.cacheReadyStates[i], "cache_ready_state_" + i).orElse(false)).booleanValue()), IteratorUtils.toList(((Iterable) this.cacheStates[i].get()).iterator())));
        }
    }
}
