package org.apache.beam.runners.samza.runtime;

import java.io.IOException;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.StreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.context.TaskContext;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStateRequestHandlers.class */
public class SamzaStateRequestHandlers {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStateRequestHandlers$BagUserStateFactory.class */
    public static class BagUserStateFactory<K extends ByteString, V extends ByteString, W extends BoundedWindow> implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
        private final SamzaStoreStateInternals.Factory<K> stateInternalsFactory;

        BagUserStateFactory(SamzaStoreStateInternals.Factory<K> factory) {
            this.stateInternalsFactory = factory;
        }

        public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String str, final String str2, Coder<K> coder, final Coder<V> coder2, final Coder<W> coder3) {
            return (StateRequestHandlers.BagUserStateHandler<K, V, W>) new StateRequestHandlers.BagUserStateHandler<K, V, W>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStateRequestHandlers.BagUserStateFactory.1
                public Iterable<V> get(K k, W w) {
                    return BagUserStateFactory.this.stateInternalsFactory.stateInternalsForKey(k).state(StateNamespaces.window(coder3, w), StateTags.bag(str2, coder2)).read();
                }

                public void append(K k, W w, Iterator<V> it) {
                    BagState state = BagUserStateFactory.this.stateInternalsFactory.stateInternalsForKey(k).state(StateNamespaces.window(coder3, w), StateTags.bag(str2, coder2));
                    while (it.hasNext()) {
                        state.add(it.next());
                    }
                }

                public void clear(K k, W w) {
                    BagUserStateFactory.this.stateInternalsFactory.stateInternalsForKey(k).state(StateNamespaces.window(coder3, w), StateTags.bag(str2, coder2)).clear();
                }

                /* JADX WARN: Multi-variable type inference failed */
                public /* bridge */ /* synthetic */ void clear(Object obj, BoundedWindow boundedWindow) {
                    clear((AnonymousClass1) obj, (ByteString) boundedWindow);
                }

                /* JADX WARN: Multi-variable type inference failed */
                public /* bridge */ /* synthetic */ void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                    append((AnonymousClass1) obj, (ByteString) boundedWindow, it);
                }

                /* JADX WARN: Multi-variable type inference failed */
                public /* bridge */ /* synthetic */ Iterable get(Object obj, BoundedWindow boundedWindow) {
                    return get((AnonymousClass1) obj, (ByteString) boundedWindow);
                }
            };
        }
    }

    public static StateRequestHandler of(String str, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions, ExecutableStage executableStage, StageBundleFactory stageBundleFactory, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map, SideInputHandler sideInputHandler) {
        StateRequestHandler createSideInputStateHandler = createSideInputStateHandler(executableStage, map, sideInputHandler);
        StateRequestHandler createUserStateRequestHandler = createUserStateRequestHandler(str, executableStage, taskContext, samzaPipelineOptions, stageBundleFactory);
        EnumMap enumMap = new EnumMap(BeamFnApi.StateKey.TypeCase.class);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.ITERABLE_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) createSideInputStateHandler);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) createSideInputStateHandler);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_KEYS_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) createSideInputStateHandler);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, (BeamFnApi.StateKey.TypeCase) createUserStateRequestHandler);
        return StateRequestHandlers.delegateBasedUponType(enumMap);
    }

    private static StateRequestHandler createSideInputStateHandler(ExecutableStage executableStage, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map, SideInputHandler sideInputHandler) {
        if (executableStage.getSideInputs().size() <= 0) {
            return StateRequestHandler.unsupported();
        }
        try {
            return StateRequestHandlers.forSideInputHandlerFactory(ProcessBundleDescriptors.getSideInputs(executableStage), (StateRequestHandlers.SideInputHandlerFactory) Preconditions.checkNotNull(StreamingSideInputHandlerFactory.forStage(executableStage, map, sideInputHandler)));
        } catch (IOException e) {
            throw new RuntimeException("Failed to initialize SideInputHandler", e);
        }
    }

    private static StateRequestHandler createUserStateRequestHandler(String str, ExecutableStage executableStage, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions, StageBundleFactory stageBundleFactory) {
        if (!StateUtils.isStateful(executableStage)) {
            return StateRequestHandler.unsupported();
        }
        return StateRequestHandlers.forBagUserStateHandlerFactory(stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(SamzaStoreStateInternals.createStateInternalsFactory(str, (Coder) ByteStringCoder.of(), taskContext, samzaPipelineOptions, executableStage)));
    }
}
