package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.fn.harness.state.CachingBeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/state/CachingBeamFnStateClientTest.class */
public class CachingBeamFnStateClientTest {
    private LoadingCache<BeamFnApi.StateKey, Map<CachingBeamFnStateClient.StateCacheKey, BeamFnApi.StateGetResponse>> stateCache;
    private List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokenList;
    private BeamFnApi.ProcessBundleRequest.CacheToken userStateToken = BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setUserState(BeamFnApi.ProcessBundleRequest.CacheToken.UserState.getDefaultInstance()).setToken(ByteString.copyFromUtf8("1")).build();
    private CachingBeamFnStateClient.StateCacheKey defaultCacheKey = CachingBeamFnStateClient.StateCacheKey.create(ByteString.copyFromUtf8("1"), ByteString.EMPTY);
    private CacheLoader<BeamFnApi.StateKey, Map<CachingBeamFnStateClient.StateCacheKey, BeamFnApi.StateGetResponse>> loader = new CacheLoader<BeamFnApi.StateKey, Map<CachingBeamFnStateClient.StateCacheKey, BeamFnApi.StateGetResponse>>() { // from class: org.apache.beam.fn.harness.state.CachingBeamFnStateClientTest.1
        @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader
        public Map<CachingBeamFnStateClient.StateCacheKey, BeamFnApi.StateGetResponse> load(BeamFnApi.StateKey stateKey) {
            return new HashMap();
        }
    };

    @Before
    public void setup() {
        this.stateCache = CacheBuilder.newBuilder().build(this.loader);
        this.cacheTokenList = new ArrayList();
    }

    @Test
    public void testNoCacheWithoutToken() throws Exception {
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(new FakeBeamFnStateClient(ImmutableMap.of(key("A"), encode("A1", "A2", "A3"))), this.stateCache, this.cacheTokenList);
        BeamFnApi.StateRequest.Builder get = BeamFnApi.StateRequest.newBuilder().setStateKey(key("A")).setGet(BeamFnApi.StateGetRequest.newBuilder().build());
        cachingBeamFnStateClient.handle(get);
        Assert.assertEquals(1L, r0.getCallCount());
        get.clearId();
        cachingBeamFnStateClient.handle(get);
        Assert.assertEquals(2L, r0.getCallCount());
    }

    @Test
    public void testCachingUserState() throws Exception {
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(key("A"), encode("A1", "A2", "A3")), 3);
        this.cacheTokenList.add(this.userStateToken);
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(fakeBeamFnStateClient, this.stateCache, this.cacheTokenList);
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("A")), getALlDataForKey(key("A"), cachingBeamFnStateClient));
        Assert.assertEquals(3L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("A")), getALlDataForKey(key("A"), cachingBeamFnStateClient));
        Assert.assertEquals(3L, fakeBeamFnStateClient.getCallCount());
    }

    @Test
    public void testCachingIterableSideInput() throws Exception {
        BeamFnApi.StateKey build = BeamFnApi.StateKey.newBuilder().setIterableSideInput(BeamFnApi.StateKey.IterableSideInput.newBuilder().setTransformId("GBK").setSideInputId("Iterable").build()).build();
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(build, encode("S1", "S2", "S3")), 3);
        this.cacheTokenList.add(sideInputCacheToken("GBK", "Iterable"));
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(fakeBeamFnStateClient, this.stateCache, this.cacheTokenList);
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build), getALlDataForKey(build, cachingBeamFnStateClient));
        Assert.assertEquals(3L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build), getALlDataForKey(build, cachingBeamFnStateClient));
        Assert.assertEquals(3L, fakeBeamFnStateClient.getCallCount());
    }

    @Test
    public void testCachingMultimapSideInput() throws Exception {
        BeamFnApi.StateKey build = BeamFnApi.StateKey.newBuilder().setMultimapKeysSideInput(BeamFnApi.StateKey.MultimapKeysSideInput.newBuilder().setTransformId("GBK").setSideInputId("Multimap").build()).build();
        BeamFnApi.StateKey build2 = BeamFnApi.StateKey.newBuilder().setMultimapSideInput(BeamFnApi.StateKey.MultimapSideInput.newBuilder().setTransformId("GBK").setSideInputId("Multimap").setKey(encode("K1")).build()).build();
        HashMap hashMap = new HashMap();
        hashMap.put(build, encode("K1", "K2"));
        hashMap.put(build2, encode("V1", "V2", "V3"));
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(hashMap, 3);
        this.cacheTokenList.add(sideInputCacheToken("GBK", "Multimap"));
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(fakeBeamFnStateClient, this.stateCache, this.cacheTokenList);
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build), getALlDataForKey(build, cachingBeamFnStateClient));
        Assert.assertEquals(2L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build), getALlDataForKey(build, cachingBeamFnStateClient));
        Assert.assertEquals(2L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build2), getALlDataForKey(build2, cachingBeamFnStateClient));
        Assert.assertEquals(5L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(build2), getALlDataForKey(build2, cachingBeamFnStateClient));
        Assert.assertEquals(5L, fakeBeamFnStateClient.getCallCount());
    }

    @Test
    public void testAppendInvalidatesLastPage() throws Exception {
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(key("A"), encode("A1"), key("B"), encode("B1")), 3);
        this.cacheTokenList.add(this.userStateToken);
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(fakeBeamFnStateClient, this.stateCache, this.cacheTokenList);
        appendToKey(key("A"), encode("A2"), cachingBeamFnStateClient);
        Assert.assertTrue(this.stateCache.getUnchecked(key("A")).isEmpty());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("A")), getALlDataForKey(key("A"), cachingBeamFnStateClient));
        Assert.assertEquals(3L, fakeBeamFnStateClient.getCallCount());
        appendToKey(key("A"), encode("A3"), cachingBeamFnStateClient);
        Assert.assertFalse(this.stateCache.getUnchecked(key("A")).containsValue(BeamFnApi.StateGetResponse.newBuilder().setData(encode("A2")).build()));
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("A")), getALlDataForKey(key("A"), cachingBeamFnStateClient));
        Assert.assertEquals(6L, fakeBeamFnStateClient.getCallCount());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("B")), getALlDataForKey(key("B"), cachingBeamFnStateClient));
        appendToKey(key("B"), encode("B2"), cachingBeamFnStateClient);
        Assert.assertTrue(this.stateCache.getUnchecked(key("B")).isEmpty());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("B")), getALlDataForKey(key("B"), cachingBeamFnStateClient));
        Assert.assertEquals(10L, fakeBeamFnStateClient.getCallCount());
        appendToKey(key("C"), encode("C1"), cachingBeamFnStateClient);
        Assert.assertTrue(this.stateCache.getUnchecked(key("C")).isEmpty());
        Assert.assertEquals(fakeBeamFnStateClient.getData().get(key("C")), getALlDataForKey(key("C"), cachingBeamFnStateClient));
    }

    @Test
    public void testCacheClear() throws Exception {
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(key("A"), encode("A1"), key("B"), encode("B1", "B2")), 3);
        this.cacheTokenList.add(this.userStateToken);
        CachingBeamFnStateClient cachingBeamFnStateClient = new CachingBeamFnStateClient(fakeBeamFnStateClient, this.stateCache, this.cacheTokenList);
        clearKey(key("A"), cachingBeamFnStateClient);
        Assert.assertEquals(1L, fakeBeamFnStateClient.getCallCount());
        Assert.assertNull(fakeBeamFnStateClient.getData().get(key("A")));
        Assert.assertEquals(ByteString.EMPTY, getALlDataForKey(key("A"), cachingBeamFnStateClient));
        Assert.assertEquals(1L, fakeBeamFnStateClient.getCallCount());
        getALlDataForKey(key("B"), cachingBeamFnStateClient);
        clearKey(key("B"), cachingBeamFnStateClient);
        Assert.assertEquals(4L, fakeBeamFnStateClient.getCallCount());
        Assert.assertNull(fakeBeamFnStateClient.getData().get(key("B")));
        Assert.assertEquals(ByteString.EMPTY, getALlDataForKey(key("B"), cachingBeamFnStateClient));
        Assert.assertEquals(4L, fakeBeamFnStateClient.getCallCount());
    }

    private BeamFnApi.StateKey key(String str) throws IOException {
        return BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId("ptransformId").setUserStateId("stateId").setWindow(ByteString.copyFromUtf8("encodedWindow")).setKey(encode(str))).build();
    }

    private BeamFnApi.ProcessBundleRequest.CacheToken sideInputCacheToken(String str, String str2) {
        return BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setTransformId(str).setSideInputId(str2).build()).setToken(ByteString.copyFromUtf8("1")).build();
    }

    private ByteString encode(String... strArr) throws IOException {
        ByteString.Output newOutput = ByteString.newOutput();
        for (String str : strArr) {
            StringUtf8Coder.of().encode(str, (OutputStream) newOutput);
        }
        return newOutput.toByteString();
    }

    private void appendToKey(BeamFnApi.StateKey stateKey, ByteString byteString, CachingBeamFnStateClient cachingBeamFnStateClient) throws Exception {
        cachingBeamFnStateClient.handle(BeamFnApi.StateRequest.newBuilder().setStateKey(stateKey).setAppend(BeamFnApi.StateAppendRequest.newBuilder().setData(byteString))).get();
    }

    private void clearKey(BeamFnApi.StateKey stateKey, CachingBeamFnStateClient cachingBeamFnStateClient) throws Exception {
        cachingBeamFnStateClient.handle(BeamFnApi.StateRequest.newBuilder().setStateKey(stateKey).setClear(BeamFnApi.StateClearRequest.getDefaultInstance())).get();
    }

    private ByteString getALlDataForKey(BeamFnApi.StateKey stateKey, CachingBeamFnStateClient cachingBeamFnStateClient) throws Exception {
        ByteString byteString = ByteString.EMPTY;
        ByteString byteString2 = ByteString.EMPTY;
        BeamFnApi.StateRequest.Builder stateKey2 = BeamFnApi.StateRequest.newBuilder().setStateKey(stateKey);
        do {
            stateKey2.clearId().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(byteString));
            CompletableFuture handle = cachingBeamFnStateClient.handle(stateKey2);
            byteString = ((BeamFnApi.StateResponse) handle.get()).getGet().getContinuationToken();
            byteString2 = byteString2.concat(((BeamFnApi.StateResponse) handle.get()).getGet().getData());
        } while (!byteString.equals(ByteString.EMPTY));
        return byteString2;
    }
}
