/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataTimeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint DATA_OUTPUT_LOCATION = LogicalEndpoint.data((String)"777L", (String)"555L");
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint TIMER_OUTPUT_LOCATION = LogicalEndpoint.timer((String)"999L", (String)"333L", (String)"111L");
    private static final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> CODER = LengthPrefixCoder.of((Coder)ByteArrayCoder.of());
    private final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint endpoint;

    @Parameterized.Parameters
    public static @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized LogicalEndpoint> data() {
        return Arrays.asList(DATA_OUTPUT_LOCATION, TIMER_OUTPUT_LOCATION);
    }

    public BeamFnDataTimeBasedBufferingOutboundObserverTest(@UnknownKeyFor @NonNull @Initialized LogicalEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Test
    public void testConfiguredTimeLimit() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList values = new ArrayList();
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        CountDownLatch waitForFlush = new CountDownLatch(1);
        BeamFnDataSizeBasedBufferingOutboundObserver consumer = BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)this.endpoint, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            values.add(e);
            waitForFlush.countDown();
        }).build());
        consumer.accept((Object)new byte[1]);
        waitForFlush.await();
        Assert.assertEquals((Object)this.messageWithData(new byte[][]{new byte[1]}), values.get(0));
    }

    @Test
    public void testConfiguredTimeLimitExceptionPropagation() throws @UnknownKeyFor @NonNull @Initialized Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
        BeamFnDataTimeBasedBufferingOutboundObserver consumer = (BeamFnDataTimeBasedBufferingOutboundObserver)BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)this.endpoint, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build());
        consumer.accept((Object)new byte[1]);
        while (!consumer.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            consumer.accept((Object)new byte[1]);
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        consumer = (BeamFnDataTimeBasedBufferingOutboundObserver)BeamFnDataBufferingOutboundObserver.forLocation((PipelineOptions)options, (LogicalEndpoint)this.endpoint, CODER, (StreamObserver)TestStreams.withOnNext(e -> {
            throw new RuntimeException("");
        }).build());
        consumer.accept((Object)new byte[1]);
        while (!consumer.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            consumer.close();
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements.Builder messageWithDataBuilder(byte[] ... datum) throws @UnknownKeyFor @NonNull @Initialized IOException {
        ByteString.Output output = ByteString.newOutput();
        for (byte[] data : datum) {
            CODER.encode((Object)data, (OutputStream)output);
        }
        if (this.endpoint.isTimer()) {
            return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setTimerFamilyId(this.endpoint.getTimerFamilyId()).setTimers(output.toByteString()));
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(this.endpoint.getInstructionId()).setTransformId(this.endpoint.getTransformId()).setData(output.toByteString()));
    }

    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements messageWithData(byte[] ... datum) throws @UnknownKeyFor @NonNull @Initialized IOException {
        return this.messageWithDataBuilder(datum).build();
    }
}

