/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserverTest;
import org.apache.beam.sdk.fn.data.BeamFnDataTimeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
    private static final LogicalEndpoint OUTPUT_LOCATION = LogicalEndpoint.of((String)"777L", (String)"555L");
    private static final Coder<WindowedValue<byte[]>> CODER = LengthPrefixCoder.of((Coder)WindowedValue.getValueOnlyCoder((Coder)ByteArrayCoder.of()));

    @Test
    public void testConfiguredTimeLimit() throws Exception {
        ArrayList values = new ArrayList();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        CountDownLatch waitForFlush = new CountDownLatch(1);
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)OUTPUT_LOCATION, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            values.add(e);
            waitForFlush.countDown();
        }).build());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[1]));
        waitForFlush.await();
        Assert.assertEquals((Object)BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[][]{new byte[1]}), (Object)Iterables.get(values, (int)0));
    }

    @Test
    public void testConfiguredTimeLimitExceptionPropagation() throws Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        BeamFnDataTimeBasedBufferingOutboundObserver consumer = (BeamFnDataTimeBasedBufferingOutboundObserver)BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)OUTPUT_LOCATION, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[1]));
        while (!consumer.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[1]));
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        consumer = (BeamFnDataTimeBasedBufferingOutboundObserver)BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)OUTPUT_LOCATION, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build());
        consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[1]));
        while (!consumer.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            consumer.close();
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

