/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ImpulseSourceFunctionTest {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ImpulseSourceFunctionTest.class);
    @Rule
    public @UnknownKeyFor @NonNull @Initialized TestName testName = new TestName();
    private final // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> sourceContext;
    private final @UnknownKeyFor @NonNull @Initialized ImpulseElementMatcher elementMatcher = new ImpulseElementMatcher();

    public ImpulseSourceFunctionTest() {
        this.sourceContext = (SourceFunction.SourceContext)Mockito.mock(SourceFunction.SourceContext.class);
        Mockito.when((Object)this.sourceContext.getCheckpointLock()).thenReturn(new Object());
    }

    @Test
    public void testInstanceOfSourceFunction() {
        MatcherAssert.assertThat((Object)new ImpulseSourceFunction(0L), (Matcher)IsInstanceOf.instanceOf(SourceFunction.class));
    }

    @Test(timeout=10000L)
    public void testImpulseInitial() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ImpulseSourceFunction source = new ImpulseSourceFunction(0L);
        ListState mockListState = ImpulseSourceFunctionTest.getMockListState(Collections.emptyList());
        source.initializeState(ImpulseSourceFunctionTest.getInitializationContext(mockListState));
        source.run(this.sourceContext);
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).getCheckpointLock();
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).collect((Object)((WindowedValue)Matchers.argThat((ArgumentMatcher)this.elementMatcher)));
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).emitWatermark(Watermark.MAX_WATERMARK);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.sourceContext});
        ((ListState)Mockito.verify(mockListState)).get();
        ((ListState)Mockito.verify(mockListState)).add((Object)true);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockListState});
    }

    @Test(timeout=10000L)
    public void testImpulseRestored() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ImpulseSourceFunction source = new ImpulseSourceFunction(0L);
        ListState<Boolean> mockListState = ImpulseSourceFunctionTest.getMockListState(Collections.singletonList(true));
        source.initializeState(ImpulseSourceFunctionTest.getInitializationContext(mockListState));
        source.run(this.sourceContext);
        ((ListState)Mockito.verify(mockListState)).get();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockListState});
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).emitWatermark(Watermark.MAX_WATERMARK);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.sourceContext});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testKeepAlive() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ImpulseSourceFunction source = new ImpulseSourceFunction(Long.MAX_VALUE);
        ListState mockListState = ImpulseSourceFunctionTest.getMockListState(Collections.emptyList());
        source.initializeState(ImpulseSourceFunctionTest.getInitializationContext(mockListState));
        Thread sourceThread = new Thread(() -> {
            try {
                source.run(this.sourceContext);
            }
            catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", (Throwable)e);
            }
        });
        try {
            sourceThread.start();
            source.cancel();
            sourceThread.join();
        }
        finally {
            sourceThread.interrupt();
            sourceThread.join();
        }
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).collect((Object)((WindowedValue)Matchers.argThat((ArgumentMatcher)this.elementMatcher)));
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).emitWatermark(Watermark.MAX_WATERMARK);
        ((ListState)Mockito.verify(mockListState)).add((Object)true);
        ((ListState)Mockito.verify(mockListState)).get();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockListState});
    }

    @Test(timeout=10000L)
    public void testKeepAliveDuringInterrupt() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ImpulseSourceFunction source = new ImpulseSourceFunction(Long.MAX_VALUE);
        ListState<Boolean> mockListState = ImpulseSourceFunctionTest.getMockListState(Collections.singletonList(true));
        source.initializeState(ImpulseSourceFunctionTest.getInitializationContext(mockListState));
        Thread sourceThread = new Thread(() -> {
            try {
                source.run(this.sourceContext);
            }
            catch (Exception e) {
                LOG.error("Exception while executing ImpulseSourceFunction", (Throwable)e);
            }
        });
        sourceThread.start();
        sourceThread.interrupt();
        Thread.sleep(200L);
        MatcherAssert.assertThat((Object)sourceThread.isAlive(), (Matcher)Is.is((Object)true));
        source.cancel();
        sourceThread.interrupt();
        sourceThread.join();
        ((SourceFunction.SourceContext)Mockito.verify(this.sourceContext)).emitWatermark(Watermark.MAX_WATERMARK);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.sourceContext});
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized FunctionInitializationContext getInitializationContext(@UnknownKeyFor @NonNull @Initialized ListState<T> listState) throws @UnknownKeyFor @NonNull @Initialized Exception {
        FunctionInitializationContext mock = (FunctionInitializationContext)Mockito.mock(FunctionInitializationContext.class);
        OperatorStateStore mockOperatorState = ImpulseSourceFunctionTest.getMockOperatorState(listState);
        Mockito.when((Object)mock.getOperatorStateStore()).thenReturn((Object)mockOperatorState);
        return mock;
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized OperatorStateStore getMockOperatorState(@UnknownKeyFor @NonNull @Initialized ListState<T> listState) throws @UnknownKeyFor @NonNull @Initialized Exception {
        OperatorStateStore mock = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        Mockito.when((Object)mock.getListState((ListStateDescriptor)Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
        return mock;
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized ListState<T> getMockListState(@UnknownKeyFor @NonNull @Initialized List<T> initialState) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ListState mock = (ListState)Mockito.mock(ListState.class);
        Mockito.when((Object)mock.get()).thenReturn(initialState);
        return mock;
    }

    private static class ImpulseElementMatcher
    implements ArgumentMatcher<WindowedValue<byte[]>> {
        private ImpulseElementMatcher() {
        }

        public @UnknownKeyFor @NonNull @Initialized boolean matches(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> o) {
            return o instanceof WindowedValue && Arrays.equals((byte[])o.getValue(), new byte[0]);
        }
    }
}

