package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.class */
public class BeamFnDataInboundObserverTest {
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
    private static final LogicalEndpoint DATA_ENDPOINT = LogicalEndpoint.data("777L", "999L");

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testDecodingElements() throws Exception {
        ArrayList arrayList = new ArrayList();
        InboundDataClient create = CompletableFutureInboundDataClient.create();
        LogicalEndpoint logicalEndpoint = DATA_ENDPOINT;
        Coder<WindowedValue<String>> coder = CODER;
        Objects.requireNonNull(arrayList);
        BeamFnDataInboundObserver beamFnDataInboundObserver = new BeamFnDataInboundObserver(logicalEndpoint, DecodingFnDataReceiver.create(coder, (v1) -> {
            r4.add(v1);
        }), create);
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"), (Boolean) false);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF"), WindowedValue.valueInGlobalWindow("GHI")));
        arrayList.clear();
        Assert.assertFalse(create.isDone());
        beamFnDataInboundObserver.accept(ByteString.EMPTY, (Boolean) true);
        Assert.assertTrue(create.isDone());
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"), (Boolean) false);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
    }

    @Test
    public void testConsumptionFailureCompletesReadFutureAndDiscardsMessages() throws Exception {
        InboundDataClient create = CompletableFutureInboundDataClient.create();
        BeamFnDataInboundObserver beamFnDataInboundObserver = new BeamFnDataInboundObserver(DATA_ENDPOINT, DecodingFnDataReceiver.create(CODER, this::throwOnDefValue), create);
        Assert.assertFalse(create.isDone());
        beamFnDataInboundObserver.accept(dataWith("ABC", "DEF", "GHI"), (Boolean) false);
        Assert.assertTrue(create.isDone());
        this.thrown.expect(ExecutionException.class);
        this.thrown.expectCause(Matchers.instanceOf(RuntimeException.class));
        this.thrown.expectMessage("Failure");
        create.awaitCompletion();
    }

    private void throwOnDefValue(WindowedValue<String> windowedValue) {
        if ("DEF".equals(windowedValue.getValue())) {
            throw new RuntimeException("Failure");
        }
    }

    private ByteString dataWith(String... strArr) throws Exception {
        ByteString.Output newOutput = ByteString.newOutput();
        for (String str : strArr) {
            CODER.encode(WindowedValue.valueInGlobalWindow(str), newOutput);
        }
        return newOutput.toByteString();
    }
}
