package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.OperatorStateBackend;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.class */
public class FlinkBroadcastStateInternals<K> implements StateInternals {
    private int indexInSubtaskGroup;
    private final OperatorStateBackend stateBackend;
    private Map<String, Map<String, ?>> stateForNonZeroOperator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$AbstractBroadcastState.class */
    public abstract class AbstractBroadcastState<T> {
        private String name;
        private final StateNamespace namespace;
        private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
        private final OperatorStateStore flinkStateBackend;

        AbstractBroadcastState(OperatorStateBackend operatorStateBackend, String str, StateNamespace stateNamespace, Coder<T> coder) {
            this.name = str;
            this.namespace = stateNamespace;
            this.flinkStateBackend = operatorStateBackend;
            this.flinkStateDescriptor = new ListStateDescriptor<>(str, new CoderTypeInformation(MapCoder.of(StringUtf8Coder.of(), coder)).createSerializer(new ExecutionConfig()));
        }

        Map<String, T> getMap() throws Exception {
            if (FlinkBroadcastStateInternals.this.indexInSubtaskGroup == 0) {
                return getMapFromBroadcastState();
            }
            Map<String, T> map = (Map) FlinkBroadcastStateInternals.this.stateForNonZeroOperator.get(this.name);
            if (map == null) {
                map = getMapFromBroadcastState();
                if (map != null) {
                    FlinkBroadcastStateInternals.this.stateForNonZeroOperator.put(this.name, map);
                    this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor).clear();
                }
            }
            return map;
        }

        Map<String, T> getMapFromBroadcastState() throws Exception {
            Iterable iterable = (Iterable) this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor).get();
            Map<String, T> map = null;
            if (iterable != null) {
                Iterator<T> it = iterable.iterator();
                if (it.hasNext()) {
                    map = (Map) it.next();
                }
            }
            return map;
        }

        void updateMap(Map<String, T> map) throws Exception {
            if (FlinkBroadcastStateInternals.this.indexInSubtaskGroup != 0) {
                if (map.isEmpty()) {
                    FlinkBroadcastStateInternals.this.stateForNonZeroOperator.remove(this.name);
                    return;
                } else {
                    FlinkBroadcastStateInternals.this.stateForNonZeroOperator.put(this.name, map);
                    return;
                }
            }
            ListState unionListState = this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor);
            unionListState.clear();
            if (map.size() > 0) {
                unionListState.add(map);
            }
        }

        void writeInternal(T t) {
            try {
                Map<String, T> map = getMap();
                if (map == null) {
                    map = new HashMap();
                }
                map.put(this.namespace.stringKey(), t);
                updateMap(map);
            } catch (Exception e) {
                throw new RuntimeException("Error updating state.", e);
            }
        }

        T readInternal() {
            try {
                Map<String, T> map = getMap();
                if (map == null) {
                    return null;
                }
                return map.get(this.namespace.stringKey());
            } catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        void clearInternal() {
            try {
                Map<String, T> map = getMap();
                if (map != null) {
                    map.remove(this.namespace.stringKey());
                    updateMap(map);
                }
            } catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$FlinkBroadcastBagState.class */
    public class FlinkBroadcastBagState<T> extends FlinkBroadcastStateInternals<K>.AbstractBroadcastState<List<T>> implements BagState<T> {
        private final StateNamespace namespace;
        private final StateTag<BagState<T>> address;

        FlinkBroadcastBagState(OperatorStateBackend operatorStateBackend, StateTag<BagState<T>> stateTag, StateNamespace stateNamespace, Coder<T> coder) {
            super(operatorStateBackend, stateTag.getId(), stateNamespace, ListCoder.of(coder));
            this.namespace = stateNamespace;
            this.address = stateTag;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v2, types: [java.util.List] */
        /* JADX WARN: Type inference failed for: r0v4, types: [java.util.List] */
        /* JADX WARN: Type inference failed for: r0v7, types: [java.util.ArrayList] */
        public void add(T t) {
            T t2 = (List) readInternal();
            if (t2 == null) {
                t2 = new ArrayList();
            }
            t2.add(t);
            writeInternal(t2);
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public BagState<T> m47readLater() {
            return this;
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m48read() {
            List list = (List) readInternal();
            return list != null ? list : Collections.emptyList();
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals.FlinkBroadcastBagState.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m49read() {
                    try {
                        return Boolean.valueOf(FlinkBroadcastBagState.this.readInternal() == null);
                    } catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            clearInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkBroadcastBagState flinkBroadcastBagState = (FlinkBroadcastBagState) obj;
            return this.namespace.equals(flinkBroadcastBagState.namespace) && this.address.equals(flinkBroadcastBagState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$FlinkBroadcastValueState.class */
    private class FlinkBroadcastValueState<T> extends FlinkBroadcastStateInternals<K>.AbstractBroadcastState<T> implements ValueState<T> {
        private final StateNamespace namespace;
        private final StateTag<ValueState<T>> address;

        FlinkBroadcastValueState(OperatorStateBackend operatorStateBackend, StateTag<ValueState<T>> stateTag, StateNamespace stateNamespace, Coder<T> coder) {
            super(operatorStateBackend, stateTag.getId(), stateNamespace, coder);
            this.namespace = stateNamespace;
            this.address = stateTag;
        }

        public void write(T t) {
            writeInternal(t);
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] */
        public ValueState<T> m50readLater() {
            return this;
        }

        public T read() {
            return readInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkBroadcastValueState flinkBroadcastValueState = (FlinkBroadcastValueState) obj;
            return this.namespace.equals(flinkBroadcastValueState.namespace) && this.address.equals(flinkBroadcastValueState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }

        public void clear() {
            clearInternal();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$FlinkCombiningState.class */
    public class FlinkCombiningState<InputT, AccumT, OutputT> extends FlinkBroadcastStateInternals<K>.AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;

        FlinkCombiningState(OperatorStateBackend operatorStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace stateNamespace, Coder<AccumT> coder) {
            super(operatorStateBackend, stateTag.getId(), stateNamespace, coder);
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.combineFn = combineFn;
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public CombiningState<InputT, AccumT, OutputT> m52readLater() {
            return this;
        }

        public void add(InputT inputt) {
            Object readInternal = readInternal();
            if (readInternal == null) {
                readInternal = this.combineFn.createAccumulator();
            }
            writeInternal(this.combineFn.addInput(readInternal, inputt));
        }

        public void addAccum(AccumT accumt) {
            AccumT readInternal = readInternal();
            if (readInternal == null) {
                writeInternal(accumt);
            } else {
                writeInternal(this.combineFn.mergeAccumulators(Arrays.asList(readInternal, accumt)));
            }
        }

        public AccumT getAccum() {
            return readInternal();
        }

        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return (AccumT) this.combineFn.mergeAccumulators(iterable);
        }

        public OutputT read() {
            AccumT readInternal = readInternal();
            return readInternal != null ? (OutputT) this.combineFn.extractOutput(readInternal) : (OutputT) this.combineFn.extractOutput(this.combineFn.createAccumulator());
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals.FlinkCombiningState.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m53read() {
                    try {
                        return Boolean.valueOf(FlinkCombiningState.this.readInternal() == null);
                    } catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            clearInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkCombiningState flinkCombiningState = (FlinkCombiningState) obj;
            return this.namespace.equals(flinkCombiningState.namespace) && this.address.equals(flinkCombiningState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$FlinkCombiningStateWithContext.class */
    public class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT> extends FlinkBroadcastStateInternals<K>.AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
        private final FlinkBroadcastStateInternals<K2> flinkStateInternals;
        private final CombineWithContext.Context context;

        FlinkCombiningStateWithContext(OperatorStateBackend operatorStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext, StateNamespace stateNamespace, Coder<AccumT> coder, FlinkBroadcastStateInternals<K2> flinkBroadcastStateInternals, CombineWithContext.Context context) {
            super(operatorStateBackend, stateTag.getId(), stateNamespace, coder);
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.combineFn = combineFnWithContext;
            this.flinkStateInternals = flinkBroadcastStateInternals;
            this.context = context;
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public CombiningState<InputT, AccumT, OutputT> m55readLater() {
            return this;
        }

        public void add(InputT inputt) {
            try {
                Object readInternal = readInternal();
                if (readInternal == null) {
                    readInternal = this.combineFn.createAccumulator(this.context);
                }
                writeInternal(this.combineFn.addInput(readInternal, inputt, this.context));
            } catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accumt) {
            try {
                AccumT readInternal = readInternal();
                if (readInternal == null) {
                    writeInternal(accumt);
                } else {
                    writeInternal(this.combineFn.mergeAccumulators(Arrays.asList(readInternal, accumt), this.context));
                }
            } catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                return readInternal();
            } catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return (AccumT) this.combineFn.mergeAccumulators(iterable, this.context);
        }

        public OutputT read() {
            try {
                return (OutputT) this.combineFn.extractOutput(readInternal(), this.context);
            } catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals.FlinkCombiningStateWithContext.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m56read() {
                    try {
                        return Boolean.valueOf(FlinkCombiningStateWithContext.this.readInternal() == null);
                    } catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            clearInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkCombiningStateWithContext flinkCombiningStateWithContext = (FlinkCombiningStateWithContext) obj;
            return this.namespace.equals(flinkCombiningStateWithContext.namespace) && this.address.equals(flinkCombiningStateWithContext.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals$FlinkKeyedCombiningState.class */
    public class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT> extends FlinkBroadcastStateInternals<K>.AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
        private final FlinkBroadcastStateInternals<K2> flinkStateInternals;

        FlinkKeyedCombiningState(OperatorStateBackend operatorStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace stateNamespace, Coder<AccumT> coder, FlinkBroadcastStateInternals<K2> flinkBroadcastStateInternals) {
            super(operatorStateBackend, stateTag.getId(), stateNamespace, coder);
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.combineFn = combineFn;
            this.flinkStateInternals = flinkBroadcastStateInternals;
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public CombiningState<InputT, AccumT, OutputT> m58readLater() {
            return this;
        }

        public void add(InputT inputt) {
            try {
                Object readInternal = readInternal();
                if (readInternal == null) {
                    readInternal = this.combineFn.createAccumulator();
                }
                writeInternal(this.combineFn.addInput(readInternal, inputt));
            } catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accumt) {
            try {
                AccumT readInternal = readInternal();
                if (readInternal == null) {
                    writeInternal(accumt);
                } else {
                    writeInternal(this.combineFn.mergeAccumulators(Arrays.asList(readInternal, accumt)));
                }
            } catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                return readInternal();
            } catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return (AccumT) this.combineFn.mergeAccumulators(iterable);
        }

        public OutputT read() {
            try {
                AccumT readInternal = readInternal();
                return readInternal != null ? (OutputT) this.combineFn.extractOutput(readInternal) : (OutputT) this.combineFn.extractOutput(this.combineFn.createAccumulator());
            } catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals.FlinkKeyedCombiningState.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m59read() {
                    try {
                        return Boolean.valueOf(FlinkKeyedCombiningState.this.readInternal() == null);
                    } catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            clearInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkKeyedCombiningState flinkKeyedCombiningState = (FlinkKeyedCombiningState) obj;
            return this.namespace.equals(flinkKeyedCombiningState.namespace) && this.address.equals(flinkKeyedCombiningState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    public FlinkBroadcastStateInternals(int i, OperatorStateBackend operatorStateBackend) {
        this.stateBackend = operatorStateBackend;
        this.indexInSubtaskGroup = i;
        if (i != 0) {
            this.stateForNonZeroOperator = new HashMap();
        }
    }

    @Nullable
    public K getKey() {
        return null;
    }

    public <T extends State> T state(final StateNamespace stateNamespace, StateTag<T> stateTag, final StateContext<?> stateContext) {
        return (T) stateTag.bind(new StateTag.StateBinder() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals.1
            public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> stateTag2, Coder<T2> coder) {
                return new FlinkBroadcastValueState(FlinkBroadcastStateInternals.this.stateBackend, stateTag2, stateNamespace, coder);
            }

            public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> stateTag2, Coder<T2> coder) {
                return new FlinkBroadcastBagState(FlinkBroadcastStateInternals.this.stateBackend, stateTag2, stateNamespace, coder);
            }

            public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> stateTag2, Coder<T2> coder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
            }

            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag2, Coder<KeyT> coder, Coder<ValueT> coder2) {
                throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new FlinkCombiningState(FlinkBroadcastStateInternals.this.stateBackend, stateTag2, combineFn, stateNamespace, coder);
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
                return new FlinkCombiningStateWithContext(FlinkBroadcastStateInternals.this.stateBackend, stateTag2, combineFnWithContext, stateNamespace, coder, FlinkBroadcastStateInternals.this, CombineContextFactory.createFromStateContext(stateContext));
            }

            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag2, TimestampCombiner timestampCombiner) {
                throw new UnsupportedOperationException(String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
            }
        });
    }
}
