package org.apache.beam.fn.harness.data;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.class */
public class BeamFnDataInboundObserverTest {
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testDecodingElements() throws Exception {
        ArrayList arrayList = new ArrayList();
        InboundDataClient create = CompletableFutureInboundDataClient.create();
        Coder<WindowedValue<String>> coder = CODER;
        Objects.requireNonNull(arrayList);
        BeamFnDataInboundObserver beamFnDataInboundObserver = new BeamFnDataInboundObserver(coder, (v1) -> {
            r3.add(v1);
        }, create);
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF"), WindowedValue.valueInGlobalWindow("GHI")));
        arrayList.clear();
        Assert.assertFalse(create.isDone());
        beamFnDataInboundObserver.accept(dataWith(new String[0]));
        Assert.assertTrue(create.isDone());
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"));
        Assert.assertThat(arrayList, Matchers.empty());
    }

    @Test
    public void testConsumptionFailureCompletesReadFutureAndDiscardsMessages() throws Exception {
        InboundDataClient create = CompletableFutureInboundDataClient.create();
        BeamFnDataInboundObserver beamFnDataInboundObserver = new BeamFnDataInboundObserver(CODER, this::throwOnDefValue, create);
        Assert.assertFalse(create.isDone());
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"));
        Assert.assertTrue(create.isDone());
        this.thrown.expect(ExecutionException.class);
        this.thrown.expectCause(Matchers.instanceOf(RuntimeException.class));
        this.thrown.expectMessage("Failure");
        create.awaitCompletion();
    }

    private void throwOnDefValue(WindowedValue<String> windowedValue) {
        if ("DEF".equals(windowedValue.getValue())) {
            throw new RuntimeException("Failure");
        }
    }

    private BeamFnApi.Elements.Data dataWith(String... strArr) throws Exception {
        BeamFnApi.Elements.Data.Builder target = BeamFnApi.Elements.Data.newBuilder().setInstructionReference("777L").setTarget(BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("999L").setName("Test"));
        ByteString.Output newOutput = ByteString.newOutput();
        for (String str : strArr) {
            CODER.encode(WindowedValue.valueInGlobalWindow(str), newOutput);
        }
        target.setData(newOutput.toByteString());
        return target.build();
    }
}
