package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.class */
public class BeamFnDataBufferingOutboundObserverTest {
    private static final LogicalEndpoint OUTPUT_LOCATION = LogicalEndpoint.of("777L", BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("555L").setName("Test").build());
    private static final Coder<WindowedValue<byte[]>> CODER = LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of()));

    /* JADX WARN: Type inference failed for: r0v14, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r0v18, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r0v8, types: [byte[], byte[][]] */
    @Test
    public void testWithDefaultBuffer() throws Exception {
        ArrayList arrayList = new ArrayList();
        BeamFnDataBufferingOutboundObserver forLocation = BeamFnDataBufferingOutboundObserver.forLocation(OUTPUT_LOCATION, CODER, TestStreams.withOnNext(addToValuesConsumer(arrayList)).withOnCompleted(setBooleanToTrue(new AtomicBoolean())).build());
        forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[999950]));
        Assert.assertThat(arrayList, Matchers.empty());
        forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[50]));
        Assert.assertEquals(messageWithData(new byte[]{new byte[999950], new byte[50]}), Iterables.get(arrayList, 0));
        forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[999950]));
        Assert.assertEquals(1L, arrayList.size());
        forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[50]));
        Assert.assertEquals(messageWithData(new byte[]{new byte[999950], new byte[50]}), Iterables.get(arrayList, 1));
        forLocation.close();
        Assert.assertEquals(messageWithData(new byte[0]), Iterables.get(arrayList, 2));
        try {
            forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[999950]));
            Assert.fail("Writing after close should be prohibited.");
        } catch (IllegalStateException e) {
        }
        try {
            forLocation.close();
            Assert.fail("Closing twice should be prohibited.");
        } catch (IllegalStateException e2) {
        }
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r0v8, types: [byte[], byte[][]] */
    @Test
    public void testConfiguredBufferLimit() throws Exception {
        ArrayList arrayList = new ArrayList();
        BeamFnDataBufferingOutboundObserver forLocationWithBufferLimit = BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(100, OUTPUT_LOCATION, CODER, TestStreams.withOnNext(addToValuesConsumer(arrayList)).withOnCompleted(setBooleanToTrue(new AtomicBoolean())).build());
        forLocationWithBufferLimit.accept(WindowedValue.valueInGlobalWindow(new byte[51]));
        Assert.assertThat(arrayList, Matchers.empty());
        forLocationWithBufferLimit.accept(WindowedValue.valueInGlobalWindow(new byte[49]));
        Assert.assertEquals(messageWithData(new byte[]{new byte[51], new byte[49]}), Iterables.get(arrayList, 0));
        forLocationWithBufferLimit.accept(WindowedValue.valueInGlobalWindow(new byte[1]));
        forLocationWithBufferLimit.close();
        Assert.assertEquals(BeamFnApi.Elements.newBuilder(messageWithData(new byte[]{new byte[1]})).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference(OUTPUT_LOCATION.getInstructionId()).setTarget(OUTPUT_LOCATION.getTarget())).build(), Iterables.get(arrayList, 1));
    }

    private static BeamFnApi.Elements messageWithData(byte[]... bArr) throws IOException {
        ByteString.Output newOutput = ByteString.newOutput();
        for (byte[] bArr2 : bArr) {
            CODER.encode(WindowedValue.valueInGlobalWindow(bArr2), newOutput);
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference(OUTPUT_LOCATION.getInstructionId()).setTarget(OUTPUT_LOCATION.getTarget()).setData(newOutput.toByteString())).build();
    }

    private Consumer<BeamFnApi.Elements> addToValuesConsumer(Collection<BeamFnApi.Elements> collection) {
        Objects.requireNonNull(collection);
        return (v1) -> {
            r0.add(v1);
        };
    }

    private Runnable setBooleanToTrue(AtomicBoolean atomicBoolean) {
        return () -> {
            atomicBoolean.set(true);
        };
    }
}
