package org.apache.beam.repackaged.direct_java.runners.fnexecution.state;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.repackaged.direct_java.runners.core.InMemoryStateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/state/InMemoryBagUserStateFactory.class */
public class InMemoryBagUserStateFactory<K, V, W extends BoundedWindow> implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
    final List<InMemorySingleKeyBagState> handlers = new ArrayList();

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/state/InMemoryBagUserStateFactory$InMemorySingleKeyBagState.class */
    static class InMemorySingleKeyBagState<K, V, W extends BoundedWindow> implements StateRequestHandlers.BagUserStateHandler<K, V, W> {
        private final StateTag<BagState<V>> stateTag;
        private final Coder<W> windowCoder;
        private volatile StateInternals stateInternals;

        InMemorySingleKeyBagState(String str, Coder<V> coder, Coder<W> coder2) {
            this.windowCoder = coder2;
            this.stateTag = StateTags.bag(str, coder);
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
        public Iterable<V> get(K k, W w) {
            initStateInternals(k);
            return ((BagState) this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag)).read();
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
        public void append(K k, W w, Iterator<V> it) {
            initStateInternals(k);
            BagState bagState = (BagState) this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag);
            while (it.hasNext()) {
                bagState.add(it.next());
            }
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
        public void clear(K k, W w) {
            initStateInternals(k);
            ((BagState) this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag)).clear();
        }

        private void initStateInternals(K k) {
            if (this.stateInternals == null) {
                this.stateInternals = InMemoryStateInternals.forKey(k);
            }
        }

        void reset() {
            this.stateInternals = null;
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandlerFactory
    public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String str, String str2, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
        InMemorySingleKeyBagState inMemorySingleKeyBagState = new InMemorySingleKeyBagState(str2, coder2, coder3);
        this.handlers.add(inMemorySingleKeyBagState);
        return inMemorySingleKeyBagState;
    }

    public void resetForNewKey() {
        Iterator<InMemorySingleKeyBagState> it = this.handlers.iterator();
        while (it.hasNext()) {
            it.next().reset();
        }
    }
}
