/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFnDataInboundObserverTest {
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
    private static final String TRANSFORM_ID = "transformId";
    private static final String TIMER_FAMILY_ID = "timerFamilyId";
    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

    @Test
    public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throws Exception {
        Thread thread = Thread.currentThread();
        ArrayList values = new ArrayList();
        ArrayList timers = new ArrayList();
        BeamFnDataInboundObserver observer = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create((String)TRANSFORM_ID, CODER, value -> {
            Assert.assertSame((Object)thread, (Object)Thread.currentThread());
            values.add(value);
        })), Arrays.asList(TimerEndpoint.create((String)TRANSFORM_ID, (String)TIMER_FAMILY_ID, CODER, value -> {
            Assert.assertSame((Object)thread, (Object)Thread.currentThread());
            timers.add(value);
        })));
        Future future = this.executor.submit(() -> {
            observer.accept(this.dataWith("ABC", "DEF", "GHI"));
            observer.accept(this.lastData());
            observer.accept(this.timerWith("UVW"));
            observer.accept(this.timerWith("XYZ"));
            observer.accept(this.lastTimer());
            return null;
        });
        observer.awaitCompletion();
        MatcherAssert.assertThat(values, (Matcher)Matchers.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"ABC"), WindowedValue.valueInGlobalWindow((Object)"DEF"), WindowedValue.valueInGlobalWindow((Object)"GHI")}));
        MatcherAssert.assertThat(timers, (Matcher)Matchers.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"UVW"), WindowedValue.valueInGlobalWindow((Object)"XYZ")}));
        future.get();
    }

    @Test
    public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver observer = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create((String)TRANSFORM_ID, CODER, value -> {
            throw new Exception("test consumer failed");
        })), Collections.emptyList());
        Future future = this.executor.submit(() -> {
            observer.accept(this.dataWith("ABC"));
            Assert.assertThrows((String)"test consumer failed", Exception.class, () -> {
                while (true) {
                    observer.accept(this.dataWith("ABC"));
                }
            });
            return null;
        });
        Assert.assertThrows((String)"test consumer failed", Exception.class, () -> observer.awaitCompletion());
        future.get();
    }

    @Test
    public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver observer = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create((String)TRANSFORM_ID, CODER, value -> {})), Collections.emptyList());
        Future future = this.executor.submit(() -> {
            observer.accept(this.dataWith("ABC"));
            Assert.assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> {
                while (true) {
                    observer.accept(this.dataWith("ABC"));
                }
            });
            return null;
        });
        Future future2 = this.executor.submit(() -> {
            observer.close();
            return null;
        });
        Assert.assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> observer.awaitCompletion());
        future.get();
        future2.get();
    }

    @Test
    public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer() throws Exception {
        BeamFnDataInboundObserver observer = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create((String)TRANSFORM_ID, CODER, value -> {})), Collections.emptyList());
        Future future = this.executor.submit(() -> {
            observer.accept(this.timerWith("DEF"));
            Assert.assertThrows((String)"Unable to find inbound timer receiver for instruction", IllegalStateException.class, () -> {
                while (true) {
                    observer.accept(this.dataWith("ABC"));
                }
            });
            return null;
        });
        Assert.assertThrows((String)"Unable to find inbound timer receiver for instruction", IllegalStateException.class, () -> observer.awaitCompletion());
        future.get();
    }

    private BeamFnApi.Elements dataWith(String ... values) throws Exception {
        ByteStringOutputStream output = new ByteStringOutputStream();
        for (String value : values) {
            CODER.encode((Object)WindowedValue.valueInGlobalWindow((Object)value), (OutputStream)output);
        }
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setData(output.toByteString())).build();
    }

    private BeamFnApi.Elements lastData() throws Exception {
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setIsLast(true)).build();
    }

    private BeamFnApi.Elements timerWith(String ... values) throws Exception {
        ByteStringOutputStream output = new ByteStringOutputStream();
        for (String value : values) {
            CODER.encode((Object)WindowedValue.valueInGlobalWindow((Object)value), (OutputStream)output);
        }
        return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setTransformId(TRANSFORM_ID).setTimerFamilyId(TIMER_FAMILY_ID).setTimers(output.toByteString())).build();
    }

    private BeamFnApi.Elements lastTimer() throws Exception {
        return BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setTransformId(TRANSFORM_ID).setTimerFamilyId(TIMER_FAMILY_ID).setIsLast(true)).build();
    }
}

