package org.apache.beam.sdk.fn.stream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.PrefetchableIteratorsTest;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsCollectionWithSize;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/stream/DataStreamsTest.class */
public class DataStreamsTest {

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/sdk/fn/stream/DataStreamsTest$DataStreamDecoderTest.class */
    public static class DataStreamDecoderTest {

        @Rule
        public ExpectedException thrown = ExpectedException.none();

        @Test
        public void testEmptyInputStream() throws Exception {
            testDecoderWith(StringUtf8Coder.of(), new String[0]);
        }

        @Test
        public void testNonEmptyInputStream() throws Exception {
            testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
        }

        @Test
        public void testNonEmptyInputStreamWithZeroLengthEncoding() throws Exception {
            CountingOutputStream countingOutputStream = new CountingOutputStream(ByteStreams.nullOutputStream());
            GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
            Assume.assumeTrue(countingOutputStream.getCount() == 0);
            testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
        }

        @Test
        public void testPrefetch() throws Exception {
            ArrayList arrayList = new ArrayList();
            arrayList.add(encode("A", "BC"));
            arrayList.add(ByteString.EMPTY);
            arrayList.add(encode("DEF", "GHIJ"));
            DataStreams.DataStreamDecoder dataStreamDecoder = new DataStreams.DataStreamDecoder(StringUtf8Coder.of(), new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext(arrayList.iterator()));
            Assert.assertFalse(dataStreamDecoder.isReady());
            dataStreamDecoder.prefetch();
            Assert.assertTrue(dataStreamDecoder.isReady());
            Assert.assertEquals(1L, r0.getNumPrefetchCalls());
            dataStreamDecoder.next();
            Assert.assertTrue(dataStreamDecoder.isReady());
            dataStreamDecoder.prefetch();
            Assert.assertEquals(1L, r0.getNumPrefetchCalls());
            dataStreamDecoder.next();
            Assert.assertFalse(dataStreamDecoder.isReady());
            dataStreamDecoder.prefetch();
            Assert.assertEquals(2L, r0.getNumPrefetchCalls());
            Assert.assertFalse(dataStreamDecoder.isReady());
            dataStreamDecoder.prefetch();
            Assert.assertEquals(3L, r0.getNumPrefetchCalls());
            Assert.assertTrue(dataStreamDecoder.isReady());
        }

        @Test
        public void testDecodeFromChunkBoundaryToChunkBoundary() throws Exception {
            ByteString encode = encode("B", "BigElementC");
            ByteString encode2 = encode("BigElementG");
            DataStreams.DataStreamDecoder dataStreamDecoder = new DataStreams.DataStreamDecoder(StringUtf8Coder.of(), new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext(Iterators.forArray(new ByteString[]{encode("A"), encode.substring(0, encode.size() - 1), encode.substring(encode.size() - 1), encode("D"), encode(new String[0]), encode("E", "F"), encode2.substring(0, encode2.size() - 1), encode2.substring(encode2.size() - 1)})));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.contains(new String[]{"A"}));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.contains(new String[]{"B", "BigElementC"}));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.contains(new String[]{"D"}));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.is(Matchers.empty()));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.contains(new String[]{"E", "F"}));
            MatcherAssert.assertThat(dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary(), Matchers.contains(new String[]{"BigElementG"}));
            Assert.assertFalse(dataStreamDecoder.hasNext());
        }

        private ByteString encode(String... strArr) throws IOException {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            for (String str : strArr) {
                StringUtf8Coder.of().encode(str, byteStringOutputStream);
            }
            return byteStringOutputStream.toByteString();
        }

        private <T> void testDecoderWith(Coder<T> coder, T... tArr) throws IOException {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            for (T t : tArr) {
                int size = byteStringOutputStream.size();
                coder.encode(t, byteStringOutputStream);
                if (byteStringOutputStream.size() - size == 0) {
                    byteStringOutputStream.write(0);
                }
            }
            testDecoderWith(coder, tArr, Arrays.asList(byteStringOutputStream.toByteString()));
            testDecoderWith(coder, tArr, Arrays.asList(ByteString.EMPTY, byteStringOutputStream.toByteString()));
            testDecoderWith(coder, tArr, Arrays.asList(byteStringOutputStream.toByteString(), ByteString.EMPTY));
        }

        private <T> void testDecoderWith(Coder<T> coder, T[] tArr, List<ByteString> list) {
            DataStreams.DataStreamDecoder dataStreamDecoder = new DataStreams.DataStreamDecoder(coder, PrefetchableIterators.maybePrefetchable(list.iterator()));
            Assert.assertArrayEquals(tArr, Iterators.toArray(dataStreamDecoder, Object.class));
            Assert.assertFalse(dataStreamDecoder.hasNext());
            Assert.assertFalse(dataStreamDecoder.hasNext());
            this.thrown.expect(NoSuchElementException.class);
            dataStreamDecoder.next();
        }
    }

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/sdk/fn/stream/DataStreamsTest$ElementDelimitedOutputStreamTest.class */
    public static class ElementDelimitedOutputStreamTest {
        @Test
        public void testNothingWritten() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            new DataStreams.ElementDelimitedOutputStream((v1) -> {
                r2.add(v1);
            }, 3).close();
            MatcherAssert.assertThat(arrayList, IsCollectionWithSize.hasSize(0));
        }

        @Test
        public void testEmptyElementsArePadded() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            DataStreams.ElementDelimitedOutputStream elementDelimitedOutputStream = new DataStreams.ElementDelimitedOutputStream((v1) -> {
                r2.add(v1);
            }, 3);
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.close();
            MatcherAssert.assertThat(arrayList, Matchers.contains(new ByteString[]{ByteString.copyFrom(new byte[3]), ByteString.copyFrom(new byte[2])}));
        }

        @Test
        public void testNonEmptyElementsAreChunked() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            DataStreams.ElementDelimitedOutputStream elementDelimitedOutputStream = new DataStreams.ElementDelimitedOutputStream((v1) -> {
                r2.add(v1);
            }, 3);
            elementDelimitedOutputStream.write(new byte[]{1, 2});
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.write(new byte[]{3, 4, 5, 6, 7, 8});
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.write(9);
            elementDelimitedOutputStream.delimitElement();
            elementDelimitedOutputStream.close();
            MatcherAssert.assertThat(arrayList, Matchers.contains(new ByteString[]{ByteString.copyFrom(new byte[]{1, 2, 3}), ByteString.copyFrom(new byte[]{4, 5, 6}), ByteString.copyFrom(new byte[]{7, 8, 9})}));
        }
    }
}
