/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataTimeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFnDataSizeBasedBufferingOutboundObserverTest {
    private static final LogicalEndpoint OUTPUT_LOCATION = LogicalEndpoint.of((String)"777L", (String)"555L");
    private static final Coder<WindowedValue<byte[]>> CODER = LengthPrefixCoder.of((Coder)WindowedValue.getValueOnlyCoder((Coder)ByteArrayCoder.of()));

    @Test
    public void testWithDefaultBuffer() throws Exception {
        ArrayList<BeamFnApi.Elements> values = new ArrayList<BeamFnApi.Elements>();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)PipelineOptionsFactory.create(), (LogicalEndpoint)OUTPUT_LOCATION, CODER, (StreamObserver)TestStreams.withOnNext(this.addToValuesConsumer(values)).withOnCompleted(this.setBooleanToTrue(onCompletedWasCalled)).build());
        Assert.assertFalse((boolean)(consumer instanceof BeamFnDataTimeBasedBufferingOutboundObserver));
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[999950]));
        Assert.assertThat(values, (Matcher)Matchers.empty());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[50]));
        Assert.assertEquals((Object)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[999950], new byte[50]), (Object)Iterables.get(values, (int)0));
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[999950]));
        Assert.assertEquals((long)1L, (long)values.size());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[50]));
        Assert.assertEquals((Object)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[999950], new byte[50]), (Object)Iterables.get(values, (int)1));
        consumer.close();
        Assert.assertEquals((Object)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[0][]), (Object)Iterables.get(values, (int)2));
        try {
            consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[999950]));
            Assert.fail((String)"Writing after close should be prohibited.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            consumer.close();
            Assert.fail((String)"Closing twice should be prohibited.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testConfiguredBufferLimit() throws Exception {
        ArrayList<BeamFnApi.Elements> values = new ArrayList<BeamFnApi.Elements>();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)OUTPUT_LOCATION, CODER, (StreamObserver)TestStreams.withOnNext(this.addToValuesConsumer(values)).withOnCompleted(this.setBooleanToTrue(onCompletedWasCalled)).build());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[51]));
        Assert.assertThat(values, (Matcher)Matchers.empty());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[49]));
        Assert.assertEquals((Object)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[51], new byte[49]), (Object)Iterables.get(values, (int)0));
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[1]));
        consumer.close();
        Assert.assertEquals((Object)BeamFnApi.Elements.newBuilder((BeamFnApi.Elements)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[][]{new byte[1]})).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(OUTPUT_LOCATION.getInstructionId()).setTransformId(OUTPUT_LOCATION.getTransformId())).build(), (Object)Iterables.get(values, (int)1));
    }

    static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException {
        ByteString.Output output = ByteString.newOutput();
        for (byte[] data : datum) {
            CODER.encode((Object)WindowedValue.valueInGlobalWindow((Object)data), (OutputStream)output);
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(OUTPUT_LOCATION.getInstructionId()).setTransformId(OUTPUT_LOCATION.getTransformId()).setData(output.toByteString())).build();
    }

    private Consumer<BeamFnApi.Elements> addToValuesConsumer(Collection<BeamFnApi.Elements> values) {
        return values::add;
    }

    private Runnable setBooleanToTrue(AtomicBoolean onCompletedWasCalled) {
        return () -> onCompletedWasCalled.set(true);
    }
}

