package org.apache.beam.fn.harness.state;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.class */
public class StateFetchingIteratorsTest {

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIteratorsTest$LazyBlockingStateFetchingIteratorTest.class */
    public static class LazyBlockingStateFetchingIteratorTest {
        @Test
        public void testEmpty() throws Exception {
            testFetch(ByteString.EMPTY);
        }

        @Test
        public void testNonEmpty() throws Exception {
            testFetch(ByteString.copyFromUtf8("A"));
        }

        @Test
        public void testWithLastByteStringBeingEmpty() throws Exception {
            testFetch(ByteString.copyFromUtf8("A"), ByteString.EMPTY);
        }

        @Test
        public void testMulti() throws Exception {
            testFetch(ByteString.copyFromUtf8("BC"), ByteString.copyFromUtf8("DEF"));
        }

        @Test
        public void testMultiWithEmptyByteStrings() throws Exception {
            testFetch(ByteString.EMPTY, ByteString.copyFromUtf8("BC"), ByteString.EMPTY, ByteString.EMPTY, ByteString.copyFromUtf8("DEF"), ByteString.EMPTY);
        }

        private BeamFnStateClient fakeStateClient(AtomicInteger atomicInteger, ByteString... byteStringArr) {
            return (builder, completableFuture) -> {
                atomicInteger.incrementAndGet();
                if (byteStringArr.length == 0) {
                    completableFuture.complete(BeamFnApi.StateResponse.newBuilder().setId(builder.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder()).build());
                    return;
                }
                ByteString continuationToken = builder.getGet().getContinuationToken();
                int i = 0;
                if (!ByteString.EMPTY.equals(continuationToken)) {
                    i = Integer.parseInt(continuationToken.toStringUtf8());
                }
                ByteString byteString = ByteString.EMPTY;
                if (i != byteStringArr.length - 1) {
                    byteString = ByteString.copyFromUtf8(Integer.toString(i + 1));
                }
                completableFuture.complete(BeamFnApi.StateResponse.newBuilder().setId(builder.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder().setData(byteStringArr[i]).setContinuationToken(byteString)).build());
            };
        }

        private void testFetch(ByteString... byteStringArr) {
            StateFetchingIterators.LazyBlockingStateFetchingIterator lazyBlockingStateFetchingIterator = new StateFetchingIterators.LazyBlockingStateFetchingIterator(fakeStateClient(new AtomicInteger(), byteStringArr), BeamFnApi.StateRequest.getDefaultInstance());
            Assert.assertEquals(0L, r0.get());
            Assert.assertArrayEquals(byteStringArr, Iterators.toArray(lazyBlockingStateFetchingIterator, Object.class));
        }

        @Test
        public void testEmptyValues() throws Exception {
            testFetchValues(VarIntCoder.of(), new Integer[0]);
        }

        @Test
        public void testOneValue() throws Exception {
            testFetchValues(VarIntCoder.of(), 4);
        }

        @Test
        public void testManyValues() throws Exception {
            testFetchValues(VarIntCoder.of(), 11, 37, 389, 5077);
        }

        private <T> void testFetchValues(Coder<T> coder, T... tArr) {
            List list = (List) Arrays.stream(tArr).map(obj -> {
                try {
                    return CoderUtils.encodeToByteArray(coder, obj);
                } catch (CoderException e) {
                    throw new RuntimeException(e);
                }
            }).map(ByteString::copyFrom).collect(Collectors.toList());
            Iterable readAllAndDecodeStartingFrom = StateFetchingIterators.readAllAndDecodeStartingFrom(fakeStateClient(new AtomicInteger(), (ByteString[]) Iterables.toArray(list, ByteString.class)), BeamFnApi.StateRequest.getDefaultInstance(), coder);
            Assert.assertEquals(0L, r0.get());
            Iterator<T> it = readAllAndDecodeStartingFrom.iterator();
            Assert.assertEquals(0L, r0.get());
            if (it.hasNext()) {
                it.next();
            }
            Assert.assertEquals(1L, r0.get());
            Iterator<T> it2 = readAllAndDecodeStartingFrom.iterator();
            Assert.assertEquals(1L, r0.get());
            if (it2.hasNext()) {
                it2.next();
            }
            Assert.assertEquals(1L, r0.get());
            Assert.assertArrayEquals(tArr, Iterables.toArray(readAllAndDecodeStartingFrom, Object.class));
        }
    }
}
