/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.state;

import java.util.EnumMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.state.InMemoryBagUserStateFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class StateRequestHandlersTest {
    @Test
    public void testDelegatingStateHandlerDelegates() throws Exception {
        StateRequestHandler mockHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        StateRequestHandler mockHandler2 = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlers = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        handlers.put(BeamFnApi.StateKey.TypeCase.TYPE_NOT_SET, mockHandler);
        handlers.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, mockHandler2);
        BeamFnApi.StateRequest request = BeamFnApi.StateRequest.getDefaultInstance();
        BeamFnApi.StateRequest request2 = BeamFnApi.StateRequest.newBuilder().setStateKey(BeamFnApi.StateKey.newBuilder().setMultimapSideInput(BeamFnApi.StateKey.MultimapSideInput.getDefaultInstance())).build();
        StateRequestHandlers.delegateBasedUponType(handlers).handle(request);
        StateRequestHandlers.delegateBasedUponType(handlers).handle(request2);
        ((StateRequestHandler)Mockito.verify((Object)mockHandler)).handle(request);
        ((StateRequestHandler)Mockito.verify((Object)mockHandler2)).handle(request2);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockHandler, mockHandler2});
    }

    @Test
    public void testDelegatingStateHandlerThrowsWhenNotFound() throws Exception {
        StateRequestHandlers.delegateBasedUponType(new EnumMap(BeamFnApi.StateKey.TypeCase.class)).handle(BeamFnApi.StateRequest.getDefaultInstance());
    }

    @Test
    public void testUserStateCacheTokenGeneration() throws Exception {
        ExecutableStage stage = StateRequestHandlersTest.buildExecutableStage("state1", "state2");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"id", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)Endpoints.ApiServiceDescriptor.getDefaultInstance());
        InMemoryBagUserStateFactory inMemoryBagUserStateFactory = new InMemoryBagUserStateFactory();
        MatcherAssert.assertThat((Object)inMemoryBagUserStateFactory.handlers.size(), (Matcher)Matchers.is((Object)0));
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)descriptor, (StateRequestHandlers.BagUserStateHandlerFactory)inMemoryBagUserStateFactory);
        BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = StateRequestHandlersTest.assertSingleCacheToken(stateRequestHandler);
        StateRequestHandlersTest.sendGetRequest(stateRequestHandler, "state1");
        MatcherAssert.assertThat((Object)inMemoryBagUserStateFactory.handlers.size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)StateRequestHandlersTest.assertSingleCacheToken(stateRequestHandler), (Matcher)Matchers.is((Object)cacheToken));
        StateRequestHandlersTest.sendGetRequest(stateRequestHandler, "state2");
        MatcherAssert.assertThat((Object)inMemoryBagUserStateFactory.handlers.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)StateRequestHandlersTest.assertSingleCacheToken(stateRequestHandler), (Matcher)Matchers.is((Object)cacheToken));
    }

    private static BeamFnApi.ProcessBundleRequest.CacheToken assertSingleCacheToken(StateRequestHandler stateRequestHandler) {
        Iterable cacheTokens = stateRequestHandler.getCacheTokens();
        MatcherAssert.assertThat((Object)Iterables.size((Iterable)cacheTokens), (Matcher)Matchers.is((Object)1));
        BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = (BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)cacheTokens);
        MatcherAssert.assertThat((Object)cacheToken.getToken(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        MatcherAssert.assertThat((Object)cacheToken.getUserState(), (Matcher)Matchers.is((Object)BeamFnApi.ProcessBundleRequest.CacheToken.UserState.getDefaultInstance()));
        return cacheToken;
    }

    private static void sendGetRequest(StateRequestHandler stateRequestHandler, String userStateName) throws Exception {
        stateRequestHandler.handle(BeamFnApi.StateRequest.newBuilder().setGet(BeamFnApi.StateGetRequest.getDefaultInstance()).setStateKey(BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8((String)"key")).setWindow(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)GlobalWindow.Coder.INSTANCE, (Object)GlobalWindow.INSTANCE))).setTransformId("transform").setUserStateId(userStateName)).build()).build()).toCompletableFuture().get();
    }

    private static ExecutableStage buildExecutableStage(String ... userStateNames) {
        RunnerApi.ExecutableStagePayload.Builder builder = RunnerApi.ExecutableStagePayload.newBuilder().setInput("input").setComponents(RunnerApi.Components.newBuilder().putWindowingStrategies("window", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()).putPcollections("input", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("window").setCoderId("coder").build()).putCoders("windowCoder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build()).putCoders("coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build()).addComponentCoderIds("keyCoder").addComponentCoderIds("valueCoder").build()).putCoders("keyCoder", RunnerApi.Coder.getDefaultInstance()).putCoders("valueCoder", RunnerApi.Coder.getDefaultInstance()).putTransforms("transform", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:pardo:v1").build()).putInputs("input", "input").build()).build());
        for (String userStateName : userStateNames) {
            builder.addUserStates(RunnerApi.ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").setLocalName(userStateName).build());
        }
        return ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)builder.build());
    }
}

