package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.class */
public class BeamFnDataOutboundAggregatorTest {
    private static final LogicalEndpoint DATA_OUTPUT_LOCATION = LogicalEndpoint.data("777L", "555L");
    private static final LogicalEndpoint TIMER_OUTPUT_LOCATION = LogicalEndpoint.timer("999L", "333L", "111L");
    private static final Coder<byte[]> CODER = LengthPrefixCoder.of(ByteArrayCoder.of());
    private final LogicalEndpoint endpoint;

    @Parameterized.Parameters
    public static Collection<LogicalEndpoint> data() {
        return Arrays.asList(DATA_OUTPUT_LOCATION, TIMER_OUTPUT_LOCATION);
    }

    public BeamFnDataOutboundAggregatorTest(LogicalEndpoint logicalEndpoint) {
        this.endpoint = logicalEndpoint;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v10, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r1v21, types: [byte[], byte[][]] */
    @Test
    public void testWithDefaultBuffer() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        PipelineOptions create = PipelineOptionsFactory.create();
        LogicalEndpoint logicalEndpoint = this.endpoint;
        Objects.requireNonNull(logicalEndpoint);
        Supplier supplier = logicalEndpoint::getInstructionId;
        Objects.requireNonNull(arrayList);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = new BeamFnDataOutboundAggregator(create, supplier, TestStreams.withOnNext((v1) -> {
            r4.add(v1);
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
        }).build(), false);
        FnDataReceiver registerOutputLocation = registerOutputLocation(beamFnDataOutboundAggregator, this.endpoint, CODER);
        beamFnDataOutboundAggregator.start();
        registerOutputLocation.accept(new byte[999950]);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        registerOutputLocation.accept(new byte[50]);
        Assert.assertEquals(messageWithData(new byte[]{new byte[999950], new byte[50]}), arrayList.get(0));
        registerOutputLocation.accept(new byte[999950]);
        Assert.assertEquals(1L, arrayList.size());
        registerOutputLocation.accept(new byte[50]);
        Assert.assertEquals(messageWithData(new byte[]{new byte[999950], new byte[50]}), arrayList.get(1));
        beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals(endMessage(), arrayList.get(2));
        beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals(endMessage(), arrayList.get(2));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v14, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r1v34, types: [byte[], byte[][]] */
    @Test
    public void testConfiguredBufferLimit() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        LogicalEndpoint logicalEndpoint = this.endpoint;
        Objects.requireNonNull(logicalEndpoint);
        Supplier supplier = logicalEndpoint::getInstructionId;
        Objects.requireNonNull(arrayList);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = new BeamFnDataOutboundAggregator(create, supplier, TestStreams.withOnNext((v1) -> {
            r4.add(v1);
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
        }).build(), false);
        FnDataReceiver registerOutputLocation = registerOutputLocation(beamFnDataOutboundAggregator, this.endpoint, CODER);
        beamFnDataOutboundAggregator.start();
        registerOutputLocation.accept(new byte[51]);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        registerOutputLocation.accept(new byte[49]);
        Assert.assertEquals(messageWithData(new byte[]{new byte[51], new byte[49]}), arrayList.get(0));
        BeamFnDataOutboundAggregator.Receiver receiver = this.endpoint.isTimer() ? (BeamFnDataOutboundAggregator.Receiver) Iterables.getOnlyElement(beamFnDataOutboundAggregator.outputTimersReceivers.values()) : (BeamFnDataOutboundAggregator.Receiver) Iterables.getOnlyElement(beamFnDataOutboundAggregator.outputDataReceivers.values());
        Assert.assertEquals(0L, receiver.bufferedSize());
        Assert.assertEquals(102L, receiver.getByteCount());
        Assert.assertEquals(2L, receiver.getElementCount());
        registerOutputLocation.accept(new byte[1]);
        beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        Assert.assertEquals(0L, receiver.bufferedSize());
        Assert.assertEquals(0L, receiver.getByteCount());
        Assert.assertEquals(0L, receiver.getElementCount());
        BeamFnApi.Elements.Builder messageWithDataBuilder = messageWithDataBuilder(new byte[]{new byte[1]});
        if (this.endpoint.isTimer()) {
            messageWithDataBuilder.addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setIsLast(true));
        } else {
            messageWithDataBuilder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setIsLast(true));
        }
        Assert.assertEquals(messageWithDataBuilder.m675build(), arrayList.get(1));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v11, types: [byte[], byte[][]] */
    @Test
    public void testConfiguredTimeLimit() throws Exception {
        ArrayList arrayList = new ArrayList();
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LogicalEndpoint logicalEndpoint = this.endpoint;
        Objects.requireNonNull(logicalEndpoint);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = new BeamFnDataOutboundAggregator(create, logicalEndpoint::getInstructionId, TestStreams.withOnNext(elements -> {
            arrayList.add(elements);
            countDownLatch.countDown();
        }).build(), false);
        FnDataReceiver registerOutputLocation = registerOutputLocation(beamFnDataOutboundAggregator, this.endpoint, CODER);
        beamFnDataOutboundAggregator.start();
        registerOutputLocation.accept(new byte[1]);
        countDownLatch.await();
        Assert.assertEquals(messageWithData(new byte[]{new byte[1]}), arrayList.get(0));
    }

    @Test
    public void testConfiguredTimeLimitExceptionPropagation() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        LogicalEndpoint logicalEndpoint = this.endpoint;
        Objects.requireNonNull(logicalEndpoint);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = new BeamFnDataOutboundAggregator(create, logicalEndpoint::getInstructionId, TestStreams.withOnNext(elements -> {
            throw new RuntimeException("");
        }).build(), false);
        FnDataReceiver registerOutputLocation = registerOutputLocation(beamFnDataOutboundAggregator, this.endpoint, CODER);
        beamFnDataOutboundAggregator.start();
        registerOutputLocation.accept(new byte[1]);
        while (!beamFnDataOutboundAggregator.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            registerOutputLocation.accept(new byte[1]);
            Assert.fail();
        } catch (Exception e) {
        }
        LogicalEndpoint logicalEndpoint2 = this.endpoint;
        Objects.requireNonNull(logicalEndpoint2);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator2 = new BeamFnDataOutboundAggregator(create, logicalEndpoint2::getInstructionId, TestStreams.withOnNext(elements2 -> {
            throw new RuntimeException("");
        }).build(), false);
        FnDataReceiver registerOutputLocation2 = registerOutputLocation(beamFnDataOutboundAggregator2, this.endpoint, CODER);
        beamFnDataOutboundAggregator2.start();
        registerOutputLocation2.accept(new byte[1]);
        while (!beamFnDataOutboundAggregator2.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            beamFnDataOutboundAggregator2.sendOrCollectBufferedDataAndFinishOutboundStreams();
            Assert.fail();
        } catch (Exception e2) {
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v25, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r2v11, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r4v10, types: [byte[], byte[][]] */
    @Test
    public void testConfiguredBufferLimitMultipleEndpoints() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("data_buffer_size_limit=100"));
        LogicalEndpoint logicalEndpoint = this.endpoint;
        Objects.requireNonNull(logicalEndpoint);
        Supplier supplier = logicalEndpoint::getInstructionId;
        Objects.requireNonNull(arrayList);
        BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = new BeamFnDataOutboundAggregator(create, supplier, TestStreams.withOnNext((v1) -> {
            r4.add(v1);
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
        }).build(), false);
        LogicalEndpoint data = LogicalEndpoint.data(this.endpoint.getInstructionId(), "additional:" + this.endpoint.getTransformId());
        FnDataReceiver registerOutputLocation = registerOutputLocation(beamFnDataOutboundAggregator, this.endpoint, CODER);
        FnDataReceiver registerOutputLocation2 = registerOutputLocation(beamFnDataOutboundAggregator, data, CODER);
        beamFnDataOutboundAggregator.start();
        registerOutputLocation.accept(new byte[51]);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        registerOutputLocation2.accept(new byte[49]);
        checkEqualInAnyOrder(messageWithDataBuilder(new byte[]{new byte[51]}).mergeFrom(messageWithDataBuilder(data, new byte[]{new byte[49]}).m675build()).m675build(), (BeamFnApi.Elements) arrayList.get(0));
        registerOutputLocation.accept(new byte[1]);
        beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
        BeamFnApi.Elements.Builder messageWithDataBuilder = messageWithDataBuilder(new byte[]{new byte[1]});
        if (this.endpoint.isTimer()) {
            messageWithDataBuilder.addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setIsLast(true));
        } else {
            messageWithDataBuilder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setIsLast(true));
        }
        messageWithDataBuilder.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(data.getInstructionId()).setTransformId(data.getTransformId()).setIsLast(true));
        checkEqualInAnyOrder(messageWithDataBuilder.m675build(), (BeamFnApi.Elements) arrayList.get(1));
    }

    private void checkEqualInAnyOrder(BeamFnApi.Elements elements, BeamFnApi.Elements elements2) {
        MatcherAssert.assertThat(elements.getDataList(), (Matcher<? super List<BeamFnApi.Elements.Data>>) Matchers.containsInAnyOrder(elements2.getDataList().toArray()));
        MatcherAssert.assertThat(elements.getTimersList(), (Matcher<? super List<BeamFnApi.Elements.Timers>>) Matchers.containsInAnyOrder(elements2.getTimersList().toArray()));
    }

    BeamFnApi.Elements.Builder messageWithDataBuilder(byte[]... bArr) throws IOException {
        return messageWithDataBuilder(this.endpoint, bArr);
    }

    BeamFnApi.Elements.Builder messageWithDataBuilder(LogicalEndpoint logicalEndpoint, byte[]... bArr) throws IOException {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        for (byte[] bArr2 : bArr) {
            CODER.encode(bArr2, byteStringOutputStream);
        }
        return logicalEndpoint.isTimer() ? BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(logicalEndpoint.getInstructionId()).setTransformId(logicalEndpoint.getTransformId()).setTimerFamilyId(logicalEndpoint.getTimerFamilyId()).setTimers(byteStringOutputStream.toByteString())) : BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(logicalEndpoint.getInstructionId()).setTransformId(logicalEndpoint.getTransformId()).setData(byteStringOutputStream.toByteString()));
    }

    BeamFnApi.Elements messageWithData(byte[]... bArr) throws IOException {
        return messageWithDataBuilder(bArr).m675build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [byte[], byte[][]] */
    BeamFnApi.Elements endMessage() throws IOException {
        BeamFnApi.Elements.Builder messageWithDataBuilder = messageWithDataBuilder(new byte[0]);
        if (this.endpoint.isTimer()) {
            messageWithDataBuilder.getTimersBuilder(0).setIsLast(true);
        } else {
            messageWithDataBuilder.getDataBuilder(0).setIsLast(true);
        }
        return messageWithDataBuilder.m675build();
    }

    <T> FnDataReceiver<T> registerOutputLocation(BeamFnDataOutboundAggregator beamFnDataOutboundAggregator, LogicalEndpoint logicalEndpoint, Coder<T> coder) {
        return logicalEndpoint.isTimer() ? beamFnDataOutboundAggregator.registerOutputTimersLocation(logicalEndpoint.getTransformId(), logicalEndpoint.getTimerFamilyId(), coder) : beamFnDataOutboundAggregator.registerOutputDataLocation(logicalEndpoint.getTransformId(), coder);
    }
}
