package org.apache.beam.runners.flink.translation.functions;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.class */
public class ImpulseSourceFunctionTest {
    private static final Logger LOG = LoggerFactory.getLogger(ImpulseSourceFunctionTest.class);

    @Rule
    public TestName testName = new TestName();
    private final ImpulseElementMatcher elementMatcher = new ImpulseElementMatcher();
    private final SourceFunction.SourceContext<WindowedValue<byte[]>> sourceContext = (SourceFunction.SourceContext) Mockito.mock(SourceFunction.SourceContext.class);

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest$ImpulseElementMatcher.class */
    private static class ImpulseElementMatcher implements ArgumentMatcher<WindowedValue<byte[]>> {
        private ImpulseElementMatcher() {
        }

        public boolean matches(WindowedValue<byte[]> windowedValue) {
            return (windowedValue instanceof WindowedValue) && Arrays.equals((byte[]) windowedValue.getValue(), new byte[0]);
        }
    }

    public ImpulseSourceFunctionTest() {
        Mockito.when(this.sourceContext.getCheckpointLock()).thenReturn(new Object());
    }

    @Test
    public void testInstanceOfSourceFunction() {
        MatcherAssert.assertThat(new ImpulseSourceFunction(false), IsInstanceOf.instanceOf(SourceFunction.class));
    }

    @Test(timeout = 10000)
    public void testImpulseInitial() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(false);
        ListState mockListState = getMockListState(Collections.emptyList());
        impulseSourceFunction.initializeState(getInitializationContext(mockListState));
        impulseSourceFunction.run(this.sourceContext);
        ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).getCheckpointLock();
        ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).collect((WindowedValue) Matchers.argThat(this.elementMatcher));
        Mockito.verifyNoMoreInteractions(new Object[]{this.sourceContext});
        ((ListState) Mockito.verify(mockListState)).get();
        ((ListState) Mockito.verify(mockListState)).add(true);
        Mockito.verifyNoMoreInteractions(new Object[]{mockListState});
    }

    @Test(timeout = 10000)
    public void testImpulseRestored() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(false);
        ListState mockListState = getMockListState(Collections.singletonList(true));
        impulseSourceFunction.initializeState(getInitializationContext(mockListState));
        impulseSourceFunction.run(this.sourceContext);
        Mockito.verifyNoMoreInteractions(new Object[]{this.sourceContext});
        ((ListState) Mockito.verify(mockListState)).get();
        Mockito.verifyNoMoreInteractions(new Object[]{mockListState});
    }

    @Test(timeout = 10000)
    public void testKeepAlive() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(true);
        ListState mockListState = getMockListState(Collections.emptyList());
        impulseSourceFunction.initializeState(getInitializationContext(mockListState));
        Thread thread = new Thread(() -> {
            try {
                impulseSourceFunction.run(this.sourceContext);
            } catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", e);
            }
        });
        try {
            thread.start();
            impulseSourceFunction.cancel();
            thread.join();
            thread.interrupt();
            thread.join();
            ((SourceFunction.SourceContext) Mockito.verify(this.sourceContext)).collect((WindowedValue) Matchers.argThat(this.elementMatcher));
            ((ListState) Mockito.verify(mockListState)).add(true);
            ((ListState) Mockito.verify(mockListState)).get();
            Mockito.verifyNoMoreInteractions(new Object[]{mockListState});
        } catch (Throwable th) {
            thread.interrupt();
            thread.join();
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testKeepAliveDuringInterrupt() throws Exception {
        ImpulseSourceFunction impulseSourceFunction = new ImpulseSourceFunction(true);
        impulseSourceFunction.initializeState(getInitializationContext(getMockListState(Collections.singletonList(true))));
        Thread thread = new Thread(() -> {
            try {
                impulseSourceFunction.run(this.sourceContext);
            } catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", e);
            }
        });
        thread.start();
        thread.interrupt();
        Thread.sleep(200L);
        MatcherAssert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(true));
        impulseSourceFunction.cancel();
        thread.interrupt();
        thread.join();
        Mockito.verifyNoMoreInteractions(new Object[]{this.sourceContext});
    }

    private static <T> FunctionInitializationContext getInitializationContext(ListState<T> listState) throws Exception {
        FunctionInitializationContext functionInitializationContext = (FunctionInitializationContext) Mockito.mock(FunctionInitializationContext.class);
        Mockito.when(functionInitializationContext.getOperatorStateStore()).thenReturn(getMockOperatorState(listState));
        return functionInitializationContext;
    }

    private static <T> OperatorStateStore getMockOperatorState(ListState<T> listState) throws Exception {
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        Mockito.when(operatorStateStore.getListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
        return operatorStateStore;
    }

    private static <T> ListState<T> getMockListState(List<T> list) throws Exception {
        ListState<T> listState = (ListState) Mockito.mock(ListState.class);
        Mockito.when(listState.get()).thenReturn(list);
        return listState;
    }
}
