/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataTimeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BeamFnDataSizeBasedBufferingOutboundObserverTest {
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint DATA_OUTPUT_LOCATION = LogicalEndpoint.data((String)"777L", (String)"555L");
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint TIMER_OUTPUT_LOCATION = LogicalEndpoint.timer((String)"999L", (String)"333L", (String)"111L");
    private static final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> CODER = LengthPrefixCoder.of((Coder)ByteArrayCoder.of());
    private final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint endpoint;

    @Parameterized.Parameters
    public static @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized LogicalEndpoint> data() {
        return Arrays.asList(DATA_OUTPUT_LOCATION, TIMER_OUTPUT_LOCATION);
    }

    public BeamFnDataSizeBasedBufferingOutboundObserverTest(@UnknownKeyFor @NonNull @Initialized LogicalEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Test
    public void testWithDefaultBuffer() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList values = new ArrayList();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)PipelineOptionsFactory.create(), (LogicalEndpoint)this.endpoint, CODER, (StreamObserver)TestStreams.withOnNext(values::add).withOnCompleted(() -> onCompletedWasCalled.set(true)).build());
        Assert.assertFalse((boolean)(consumer instanceof BeamFnDataTimeBasedBufferingOutboundObserver));
        consumer.accept((Object)new byte[999950]);
        Assert.assertThat(values, (Matcher)Matchers.empty());
        consumer.accept((Object)new byte[50]);
        Assert.assertEquals((Object)this.messageWithData(new byte[999950], new byte[50]), values.get(0));
        consumer.accept((Object)new byte[999950]);
        Assert.assertEquals((long)1L, (long)values.size());
        consumer.accept((Object)new byte[50]);
        Assert.assertEquals((Object)this.messageWithData(new byte[999950], new byte[50]), values.get(1));
        consumer.close();
        Assert.assertEquals((Object)this.endMessage(), values.get(2));
        try {
            consumer.accept((Object)new byte[999950]);
            Assert.fail((String)"Writing after close should be prohibited.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        consumer.close();
    }

    @Test
    public void testConfiguredBufferLimit() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList values = new ArrayList();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)this.endpoint, CODER, (StreamObserver)TestStreams.withOnNext(values::add).withOnCompleted(() -> onCompletedWasCalled.set(true)).build());
        consumer.accept((Object)new byte[51]);
        Assert.assertThat(values, (Matcher)Matchers.empty());
        consumer.accept((Object)new byte[49]);
        Assert.assertEquals((Object)this.messageWithData(new byte[51], new byte[49]), values.get(0));
        consumer.accept((Object)new byte[1]);
        consumer.close();
        BeamFnApi.Elements.Builder builder = this.messageWithDataBuilder(new byte[][]{new byte[1]});
        if (this.endpoint.isTimer()) {
            builder.addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setIsLast(true));
        } else {
            builder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setIsLast(true));
        }
        Assert.assertEquals((Object)builder.build(), values.get(1));
    }

    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements.Builder messageWithDataBuilder(byte[] ... datum) throws @UnknownKeyFor @NonNull @Initialized IOException {
        ByteString.Output output = ByteString.newOutput();
        for (byte[] data : datum) {
            CODER.encode((Object)data, (OutputStream)output);
        }
        if (this.endpoint.isTimer()) {
            return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setTimers(output.toByteString()));
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setData(output.toByteString()));
    }

    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements messageWithData(byte[] ... datum) throws @UnknownKeyFor @NonNull @Initialized IOException {
        return this.messageWithDataBuilder(datum).build();
    }

    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements endMessage() throws @UnknownKeyFor @NonNull @Initialized IOException {
        BeamFnApi.Elements.Builder builder = this.messageWithDataBuilder(new byte[0][]);
        if (this.endpoint.isTimer()) {
            builder.getTimersBuilder(0).setIsLast(true);
        } else {
            builder.getDataBuilder(0).setIsLast(true);
        }
        return builder.build();
    }
}

