/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BeamFnDataOutboundAggregatorTest {
    private static final LogicalEndpoint DATA_OUTPUT_LOCATION = LogicalEndpoint.data((String)"777L", (String)"555L");
    private static final LogicalEndpoint TIMER_OUTPUT_LOCATION = LogicalEndpoint.timer((String)"999L", (String)"333L", (String)"111L");
    private static final Coder<byte[]> CODER = LengthPrefixCoder.of((Coder)ByteArrayCoder.of());
    private final LogicalEndpoint endpoint;

    @Parameterized.Parameters
    public static Collection<LogicalEndpoint> data() {
        return Arrays.asList(DATA_OUTPUT_LOCATION, TIMER_OUTPUT_LOCATION);
    }

    public BeamFnDataOutboundAggregatorTest(LogicalEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Test
    public void testWithDefaultBuffer() throws Exception {
        ArrayList values = new ArrayList();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        BeamFnDataOutboundAggregator aggregator = new BeamFnDataOutboundAggregator(PipelineOptionsFactory.create(), () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(values::add).withOnCompleted(() -> onCompletedWasCalled.set(true)).build(), false);
        FnDataReceiver<byte[]> dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[999950]);
        MatcherAssert.assertThat(values, (Matcher)Matchers.empty());
        dataReceiver.accept((Object)new byte[50]);
        Assert.assertEquals((Object)this.messageWithData(new byte[999950], new byte[50]), values.get(0));
        dataReceiver.accept((Object)new byte[999950]);
        Assert.assertEquals((long)1L, (long)values.size());
        dataReceiver.accept((Object)new byte[50]);
        Assert.assertEquals((Object)this.messageWithData(new byte[999950], new byte[50]), values.get(1));
        aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals((Object)this.endMessage(), values.get(2));
        aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals((Object)this.endMessage(), values.get(2));
    }

    @Test
    public void testConfiguredBufferLimit() throws Exception {
        ArrayList values = new ArrayList();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        BeamFnDataOutboundAggregator aggregator = new BeamFnDataOutboundAggregator(options, () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(values::add).withOnCompleted(() -> onCompletedWasCalled.set(true)).build(), false);
        FnDataReceiver<byte[]> dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[51]);
        MatcherAssert.assertThat(values, (Matcher)Matchers.empty());
        dataReceiver.accept((Object)new byte[49]);
        Assert.assertEquals((Object)this.messageWithData(new byte[51], new byte[49]), values.get(0));
        BeamFnDataOutboundAggregator.Receiver receiver = this.endpoint.isTimer() ? (BeamFnDataOutboundAggregator.Receiver)Iterables.getOnlyElement(aggregator.outputTimersReceivers.values()) : (BeamFnDataOutboundAggregator.Receiver)Iterables.getOnlyElement(aggregator.outputDataReceivers.values());
        Assert.assertEquals((long)0L, (long)receiver.bufferedSize());
        Assert.assertEquals((long)102L, (long)receiver.getByteCount());
        Assert.assertEquals((long)2L, (long)receiver.getElementCount());
        dataReceiver.accept((Object)new byte[1]);
        aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals((long)0L, (long)receiver.bufferedSize());
        Assert.assertEquals((long)0L, (long)receiver.getByteCount());
        Assert.assertEquals((long)0L, (long)receiver.getElementCount());
        BeamFnApi.Elements.Builder builder = this.messageWithDataBuilder(new byte[][]{new byte[1]});
        if (this.endpoint.isTimer()) {
            builder.addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setIsLast(true));
        } else {
            builder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setIsLast(true));
        }
        Assert.assertEquals((Object)builder.build(), values.get(1));
    }

    @Test
    public void testConfiguredTimeLimit() throws Exception {
        ArrayList values = new ArrayList();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        CountDownLatch waitForFlush = new CountDownLatch(1);
        BeamFnDataOutboundAggregator aggregator = new BeamFnDataOutboundAggregator(options, () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(e -> {
            values.add(e);
            waitForFlush.countDown();
        }).build(), false);
        FnDataReceiver<byte[]> dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[1]);
        waitForFlush.await();
        Assert.assertEquals((Object)this.messageWithData(new byte[][]{new byte[1]}), values.get(0));
    }

    @Test
    public void testConfiguredTimeLimitExceptionPropagation() throws Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        BeamFnDataOutboundAggregator aggregator = new BeamFnDataOutboundAggregator(options, () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build(), false);
        FnDataReceiver<byte[]> dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[1]);
        while (!aggregator.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            dataReceiver.accept((Object)new byte[1]);
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        aggregator = new BeamFnDataOutboundAggregator(options, () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build(), false);
        dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[1]);
        while (!aggregator.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testConfiguredBufferLimitMultipleEndpoints() throws Exception {
        ArrayList values = new ArrayList();
        AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        BeamFnDataOutboundAggregator aggregator = new BeamFnDataOutboundAggregator(options, () -> ((LogicalEndpoint)this.endpoint).getInstructionId(), (StreamObserver)TestStreams.withOnNext(values::add).withOnCompleted(() -> onCompletedWasCalled.set(true)).build(), false);
        LogicalEndpoint additionalEndpoint = LogicalEndpoint.data((String)this.endpoint.getInstructionId(), (String)("additional:" + this.endpoint.getTransformId()));
        FnDataReceiver<byte[]> dataReceiver = this.registerOutputLocation(aggregator, this.endpoint, CODER);
        FnDataReceiver<byte[]> additionalDataReceiver = this.registerOutputLocation(aggregator, additionalEndpoint, CODER);
        aggregator.start();
        dataReceiver.accept((Object)new byte[51]);
        MatcherAssert.assertThat(values, (Matcher)Matchers.empty());
        additionalDataReceiver.accept((Object)new byte[49]);
        this.checkEqualInAnyOrder(this.messageWithDataBuilder(new byte[][]{new byte[51]}).mergeFrom(this.messageWithDataBuilder(additionalEndpoint, (byte[][])new byte[][]{new byte[49]}).build()).build(), (BeamFnApi.Elements)values.get(0));
        dataReceiver.accept((Object)new byte[1]);
        aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        BeamFnApi.Elements.Builder builder = this.messageWithDataBuilder(new byte[][]{new byte[1]});
        if (this.endpoint.isTimer()) {
            builder.addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setIsLast(true));
        } else {
            builder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setIsLast(true));
        }
        builder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(additionalEndpoint.getInstructionId()).setTransformId(additionalEndpoint.getTransformId()).setIsLast(true));
        this.checkEqualInAnyOrder(builder.build(), (BeamFnApi.Elements)values.get(1));
    }

    private void checkEqualInAnyOrder(BeamFnApi.Elements first, BeamFnApi.Elements second) {
        MatcherAssert.assertThat((Object)first.getDataList(), (Matcher)Matchers.containsInAnyOrder((Object[])second.getDataList().toArray()));
        MatcherAssert.assertThat((Object)first.getTimersList(), (Matcher)Matchers.containsInAnyOrder((Object[])second.getTimersList().toArray()));
    }

    BeamFnApi.Elements.Builder messageWithDataBuilder(byte[] ... datum) throws IOException {
        return this.messageWithDataBuilder(this.endpoint, datum);
    }

    BeamFnApi.Elements.Builder messageWithDataBuilder(LogicalEndpoint endpoint, byte[] ... datum) throws IOException {
        ByteStringOutputStream output = new ByteStringOutputStream();
        for (byte[] data : datum) {
            CODER.encode((Object)data, (OutputStream)output);
        }
        if (endpoint.isTimer()) {
            return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(endpoint.getInstructionId()).setTransformId(endpoint.getTransformId()).setTimerFamilyId(endpoint.getTimerFamilyId()).setTimers(output.toByteString()));
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(endpoint.getInstructionId()).setTransformId(endpoint.getTransformId()).setData(output.toByteString()));
    }

    BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException {
        return this.messageWithDataBuilder(datum).build();
    }

    BeamFnApi.Elements endMessage() throws IOException {
        BeamFnApi.Elements.Builder builder = this.messageWithDataBuilder(new byte[0][]);
        if (this.endpoint.isTimer()) {
            builder.getTimersBuilder(0).setIsLast(true);
        } else {
            builder.getDataBuilder(0).setIsLast(true);
        }
        return builder.build();
    }

    <T> FnDataReceiver<T> registerOutputLocation(BeamFnDataOutboundAggregator aggregator, LogicalEndpoint endpoint, Coder<T> coder) {
        if (endpoint.isTimer()) {
            return aggregator.registerOutputTimersLocation(endpoint.getTransformId(), endpoint.getTimerFamilyId(), coder);
        }
        return aggregator.registerOutputDataLocation(endpoint.getTransformId(), coder);
    }
}

