package org.apache.beam.fn.harness.state;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterableTest.class */
public class StateBackedIterableTest {

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterableTest$CoderTest.class */
    public static class CoderTest {
        @Test
        public void testDecodeEncodeRegularIterable() throws Exception {
            FluentIterable of = FluentIterable.of("A", new String[]{"B", "C"});
            StateBackedIterable.Coder coder = new StateBackedIterable.Coder(() -> {
                return Caches.noop();
            }, (BeamFnStateClient) null, () -> {
                return "instructionId";
            }, StringUtf8Coder.of());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            coder.encode(of, byteArrayOutputStream);
            Assert.assertEquals(Lists.newArrayList(of), Lists.newArrayList(coder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))));
        }

        @Test
        public void testEncodeDecodeStateBackedIterable() throws Exception {
            StateBackedIterable stateBackedIterable = new StateBackedIterable(Caches.noop(), (BeamFnStateClient) null, "instructionId", StateBackedIterableTest.key("key"), StringUtf8Coder.of(), Arrays.asList("A", "B"));
            StateBackedIterable.Coder coder = new StateBackedIterable.Coder(() -> {
                return Caches.noop();
            }, (BeamFnStateClient) null, () -> {
                return "instructionId";
            }, StringUtf8Coder.of());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            coder.encode(stateBackedIterable, byteArrayOutputStream);
            StateBackedIterable decode = coder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
            Assert.assertEquals(stateBackedIterable.prefix, decode.prefix);
            Assert.assertEquals(stateBackedIterable.request, decode.request);
        }

        @Test
        public void testSerializability() throws Exception {
            StateBackedIterable stateBackedIterable = new StateBackedIterable(Caches.noop(), new FakeBeamFnStateClient((Coder) StringUtf8Coder.of(), (Map) ImmutableMap.of(StateBackedIterableTest.key("suffix"), Arrays.asList("C", "D", "E"), StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0]))), "instruction", StateBackedIterableTest.key("suffix"), StringUtf8Coder.of(), ImmutableList.of("A", "B"));
            ImmutableList of = ImmutableList.of("A", "B", "C", "D", "E");
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(stateBackedIterable);
            objectOutputStream.flush();
            Assert.assertEquals(of, Lists.newArrayList((Iterable) new ObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())).readObject()));
            Assert.assertEquals(of, Lists.newArrayList(stateBackedIterable));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterableTest$IterationTest.class */
    public static class IterationTest {

        @Parameterized.Parameter(0)
        public List<String> prefix;

        @Parameterized.Parameter(1)
        public String suffixKey;

        @Parameterized.Parameter(2)
        public List<String> expected;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add(new Object[]{Collections.emptyList(), "emptySuffix", ImmutableList.of()}).add(new Object[]{ImmutableList.of("A", "B"), "emptySuffix", ImmutableList.of("A", "B")}).add(new Object[]{Collections.emptyList(), "nonEmptySuffix", ImmutableList.of("C", "D", "E", "F", "G", "H", "I", "J", "K")}).add(new Object[]{ImmutableList.of("A", "B"), "nonEmptySuffix", ImmutableList.of("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K")}).build();
        }

        @Test
        public void testReiteration() throws Exception {
            StateBackedIterable stateBackedIterable = new StateBackedIterable(Caches.noop(), new FakeBeamFnStateClient((Coder) StringUtf8Coder.of(), (Map) ImmutableMap.of(StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0]))), "instruction", StateBackedIterableTest.key(this.suffixKey), StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
        }

        @Test
        public void testReiterationCached() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient((Coder) StringUtf8Coder.of(), (Map) ImmutableMap.of(StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable stateBackedIterable = new StateBackedIterable(Caches.eternal(), fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(0L, fakeBeamFnStateClient.getCallCount());
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
            int callCount = fakeBeamFnStateClient.getCallCount();
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable));
            Assert.assertEquals(callCount, fakeBeamFnStateClient.getCallCount());
        }

        @Test
        public void testCacheKeyIsUnique() throws Exception {
            Cache eternal = Caches.eternal();
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient((Coder) StringUtf8Coder.of(), (Map) ImmutableMap.of(StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0]), StateBackedIterableTest.key("otherIterable"), Arrays.asList("Z")));
            StateBackedIterable stateBackedIterable = new StateBackedIterable(eternal, fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key("otherIterable"), StringUtf8Coder.of(), Collections.emptyList());
            Assert.assertEquals(0L, fakeBeamFnStateClient.getCallCount());
            Assert.assertEquals(Arrays.asList("Z"), Lists.newArrayList(stateBackedIterable));
            StateBackedIterable stateBackedIterable2 = new StateBackedIterable(eternal, fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable2));
            int callCount = fakeBeamFnStateClient.getCallCount();
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable2));
            Assert.assertEquals(this.expected, Lists.newArrayList(stateBackedIterable2));
            Assert.assertEquals(callCount, fakeBeamFnStateClient.getCallCount());
        }

        @Test
        public void testUsingInterleavedReiteration() throws Exception {
            StateBackedIterable stateBackedIterable = new StateBackedIterable(Caches.noop(), new FakeBeamFnStateClient((Coder) StringUtf8Coder.of(), (Map) ImmutableMap.of(StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0]))), "instruction", StateBackedIterableTest.key(this.suffixKey), StringUtf8Coder.of(), this.prefix);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < 3; i++) {
                arrayList.add(stateBackedIterable.iterator());
                arrayList2.add(new ArrayList());
            }
            Random random = new Random(42L);
            while (!arrayList.isEmpty()) {
                int nextInt = random.nextInt(arrayList.size());
                if (((Iterator) arrayList.get(nextInt)).hasNext()) {
                    ((List) arrayList2.get(nextInt)).add((String) ((Iterator) arrayList.get(nextInt)).next());
                } else {
                    arrayList.remove(nextInt);
                    Assert.assertEquals(this.expected, arrayList2.remove(nextInt));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BeamFnApi.StateKey key(String str) throws IOException {
        return BeamFnApi.StateKey.newBuilder().setRunner(BeamFnApi.StateKey.Runner.newBuilder().setKey(encode(str))).build();
    }

    private static ByteString encode(String... strArr) throws IOException {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        for (String str : strArr) {
            StringUtf8Coder.of().encode(str, byteStringOutputStream);
        }
        return byteStringOutputStream.toByteString();
    }
}
