package org.apache.beam.runners.flink.translation.functions;

import java.util.Arrays;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.class */
public class ImpulseSourceFunctionTest {
    private static final Logger LOG = LoggerFactory.getLogger(ImpulseSourceFunctionTest.class);

    @Rule
    public TestName testName = new TestName();
    private final ImpulseElementMatcher elementMatcher = new ImpulseElementMatcher();
    private final SourceFunction.SourceContext<WindowedValue<byte[]>> sourceContext = (SourceFunction.SourceContext) Mockito.mock(SourceFunction.SourceContext.class);

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest$ImpulseElementMatcher.class */
    private static class ImpulseElementMatcher extends ArgumentMatcher<WindowedValue<byte[]>> {
        private ImpulseElementMatcher() {
        }

        public boolean matches(Object obj) {
            return (obj instanceof WindowedValue) && Arrays.equals((byte[]) ((WindowedValue) obj).getValue(), new byte[0]);
        }
    }

    @Test
    public void testInstanceOfSourceFunction() {
        Assert.assertThat(new ImpulseSourceFunction(false), IsInstanceOf.instanceOf(SourceFunction.class));
    }

    @Test(timeout = 10000)
    public void testImpulse() throws Exception {
        new ImpulseSourceFunction(false).run(this.sourceContext);
        ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).collect((WindowedValue) Matchers.argThat(this.elementMatcher));
    }

    @Test(timeout = 10000)
    public void testKeepAlive() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(true);
        Thread thread = new Thread(() -> {
            try {
                impulseSourceFunction.run(this.sourceContext);
            } catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", e);
            }
        });
        try {
            thread.start();
            impulseSourceFunction.cancel();
            thread.join();
            ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).collect((WindowedValue) Matchers.argThat(this.elementMatcher));
        } finally {
            thread.interrupt();
            thread.join();
        }
    }

    @Test(timeout = 10000)
    public void testKeepAliveDuringInterrupt() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(true);
        Thread thread = new Thread(() -> {
            try {
                impulseSourceFunction.run(this.sourceContext);
            } catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", e);
            }
        });
        thread.start();
        thread.interrupt();
        Thread.sleep(200L);
        Assert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(true));
        impulseSourceFunction.cancel();
        thread.interrupt();
        thread.join();
        ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).collect((WindowedValue) Matchers.argThat(this.elementMatcher));
    }
}
