package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.class */
public class BeamFnDataInboundObserverTest {
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
    private static final String TRANSFORM_ID = "transformId";
    private static final String TIMER_FAMILY_ID = "timerFamilyId";

    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

    @Test
    public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throws Exception {
        Thread currentThread = Thread.currentThread();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        BeamFnDataInboundObserver forConsumers = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, windowedValue -> {
            Assert.assertSame(currentThread, Thread.currentThread());
            arrayList.add(windowedValue);
        })), Arrays.asList(TimerEndpoint.create(TRANSFORM_ID, TIMER_FAMILY_ID, CODER, windowedValue2 -> {
            Assert.assertSame(currentThread, Thread.currentThread());
            arrayList2.add(windowedValue2);
        })));
        Future submit = this.executor.submit(() -> {
            forConsumers.accept(dataWith("ABC", "DEF", "GHI"));
            forConsumers.accept(lastData());
            forConsumers.accept(timerWith("UVW"));
            forConsumers.accept(timerWith("XYZ"));
            forConsumers.accept(lastTimer());
            return null;
        });
        forConsumers.awaitCompletion();
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF"), WindowedValue.valueInGlobalWindow("GHI")));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("UVW"), WindowedValue.valueInGlobalWindow("XYZ")));
        submit.get();
    }

    @Test
    public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver forConsumers = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, windowedValue -> {
            throw new Exception("test consumer failed");
        })), Collections.emptyList());
        Future submit = this.executor.submit(() -> {
            forConsumers.accept(dataWith("ABC"));
            Assert.assertThrows("test consumer failed", Exception.class, () -> {
                while (true) {
                    forConsumers.accept(dataWith("ABC"));
                }
            });
            return null;
        });
        Assert.assertThrows("test consumer failed", Exception.class, () -> {
            forConsumers.awaitCompletion();
        });
        submit.get();
    }

    @Test
    public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver forConsumers = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, windowedValue -> {
        })), Collections.emptyList());
        Future submit = this.executor.submit(() -> {
            forConsumers.accept(dataWith("ABC"));
            Assert.assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> {
                while (true) {
                    forConsumers.accept(dataWith("ABC"));
                }
            });
            return null;
        });
        Future submit2 = this.executor.submit(() -> {
            forConsumers.close();
            return null;
        });
        Assert.assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> {
            forConsumers.awaitCompletion();
        });
        submit.get();
        submit2.get();
    }

    @Test
    public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver forConsumers = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, windowedValue -> {
        })), Collections.emptyList());
        Future submit = this.executor.submit(() -> {
            forConsumers.accept(timerWith("DEF"));
            Assert.assertThrows("Unable to find inbound timer receiver for instruction", IllegalStateException.class, () -> {
                while (true) {
                    forConsumers.accept(dataWith("ABC"));
                }
            });
            return null;
        });
        Assert.assertThrows("Unable to find inbound timer receiver for instruction", IllegalStateException.class, () -> {
            forConsumers.awaitCompletion();
        });
        submit.get();
    }

    private BeamFnApi.Elements dataWith(String... strArr) throws Exception {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        for (String str : strArr) {
            CODER.encode(WindowedValue.valueInGlobalWindow(str), byteStringOutputStream);
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setData(byteStringOutputStream.toByteString())).m701build();
    }

    private BeamFnApi.Elements lastData() throws Exception {
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setIsLast(true)).m701build();
    }

    private BeamFnApi.Elements timerWith(String... strArr) throws Exception {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        for (String str : strArr) {
            CODER.encode(WindowedValue.valueInGlobalWindow(str), byteStringOutputStream);
        }
        return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setTransformId(TRANSFORM_ID).setTimerFamilyId(TIMER_FAMILY_ID).setTimers(byteStringOutputStream.toByteString())).m701build();
    }

    private BeamFnApi.Elements lastTimer() throws Exception {
        return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setTransformId(TRANSFORM_ID).setTimerFamilyId(TIMER_FAMILY_ID).setIsLast(true)).m701build();
    }
}
