package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.State;
import org.apache.storm.streams.Pair;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TupleWindow;
import org.apache.storm.windowing.WaterMarkEvent;
import org.apache.storm.windowing.WaterMarkEventGenerator;
import org.apache.storm.windowing.persistence.WindowState;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/storm/topology/PersistentWindowedBoltExecutorTest.class */
public class PersistentWindowedBoltExecutorTest {
    private static final String LATE_STREAM = "late_stream";
    private static final String PARTITION_KEY = "pk";
    private static final String EVICTION_STATE_KEY = "es";
    private static final String TRIGGER_STATE_KEY = "ts";
    private static final int WINDOW_EVENT_COUNT = 5;
    private long tupleTs;
    private PersistentWindowedBoltExecutor<KeyValueState<String, String>> executor;
    private IStatefulWindowedBolt<KeyValueState<String, String>> mockBolt;
    private Map<String, Object> testStormConf = new HashMap();
    private OutputCollector mockOutputCollector;
    private TopologyContext mockTopologyContext;
    private TimestampExtractor mockTimestampExtractor;
    private WaterMarkEventGenerator mockWaterMarkEventGenerator;

    @Mock
    private KeyValueState<String, Deque<Long>> mockPartitionState;

    @Mock
    private KeyValueState<Long, WindowState.WindowPartition<Tuple>> mockWindowState;

    @Mock
    private KeyValueState<String, Optional<?>> mockSystemState;

    @Captor
    private ArgumentCaptor<Tuple> tupleCaptor;

    @Captor
    private ArgumentCaptor<Collection<Tuple>> anchorCaptor;

    @Captor
    private ArgumentCaptor<Long> longCaptor;

    @Captor
    private ArgumentCaptor<Values> valuesCaptor;

    @Captor
    private ArgumentCaptor<TupleWindow> tupleWindowCaptor;

    @Captor
    private ArgumentCaptor<Deque<Long>> partitionValuesCaptor;

    @Captor
    private ArgumentCaptor<WindowState.WindowPartition<Tuple>> windowValuesCaptor;

    @Captor
    private ArgumentCaptor<Optional<?>> systemValuesCaptor;

    @Before
    public void setUp() throws Exception {
        this.mockBolt = (IStatefulWindowedBolt) Mockito.mock(IStatefulWindowedBolt.class);
        this.mockWaterMarkEventGenerator = (WaterMarkEventGenerator) Mockito.mock(WaterMarkEventGenerator.class);
        this.mockTimestampExtractor = (TimestampExtractor) Mockito.mock(TimestampExtractor.class);
        this.tupleTs = System.currentTimeMillis();
        Mockito.when(Long.valueOf(this.mockTimestampExtractor.extractTimestamp((Tuple) Mockito.any()))).thenReturn(Long.valueOf(this.tupleTs));
        Mockito.when(this.mockBolt.getTimestampExtractor()).thenReturn(this.mockTimestampExtractor);
        this.mockTopologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        Mockito.when(this.mockTopologyContext.getThisStreams()).thenReturn(Collections.singleton(LATE_STREAM));
        this.mockOutputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        this.executor = new PersistentWindowedBoltExecutor<>(this.mockBolt);
        this.testStormConf.put("topology.bolts.window.length.count", Integer.valueOf(WINDOW_EVENT_COUNT));
        this.testStormConf.put("topology.bolts.window.sliding.interval.count", Integer.valueOf(WINDOW_EVENT_COUNT));
        this.testStormConf.put("topology.bolts.late.tuple.stream", LATE_STREAM);
        this.testStormConf.put("topology.bolts.watermark.event.interval.ms", 100000);
        this.testStormConf.put("topology.message.timeout.secs", 30);
        this.testStormConf.put("topology.state.checkpoint.interval.ms", 1000);
        Mockito.when(this.mockPartitionState.get(Mockito.any(), Mockito.any())).then(AdditionalAnswers.returnsArgAt(1));
        Mockito.when(this.mockWindowState.get(Mockito.any(), Mockito.any())).then(AdditionalAnswers.returnsArgAt(1));
        Mockito.when(this.mockSystemState.iterator()).thenReturn(ImmutableMap.of(EVICTION_STATE_KEY, Optional.empty(), TRIGGER_STATE_KEY, Optional.empty()).entrySet().iterator());
        this.executor.prepare(this.testStormConf, this.mockTopologyContext, this.mockOutputCollector, this.mockWindowState, this.mockPartitionState, this.mockSystemState);
    }

    @Test
    public void testExecuteTuple() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockWaterMarkEventGenerator.track((GlobalStreamId) Mockito.any(), Mockito.anyLong()))).thenReturn(true);
        Tuple tuple = (Tuple) Mockito.mock(Tuple.class);
        this.executor.initState((State) null);
        this.executor.waterMarkEventGenerator = this.mockWaterMarkEventGenerator;
        this.executor.execute(tuple);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).ack(tuple);
    }

    @Test
    public void testExecuteLatetuple() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockWaterMarkEventGenerator.track((GlobalStreamId) Mockito.any(), Mockito.anyLong()))).thenReturn(false);
        Tuple tuple = (Tuple) Mockito.mock(Tuple.class);
        this.executor.initState((State) null);
        this.executor.waterMarkEventGenerator = this.mockWaterMarkEventGenerator;
        this.executor.execute(tuple);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).ack(tuple);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).emit((String) forClass.capture(), (Collection) this.anchorCaptor.capture(), (List) this.valuesCaptor.capture());
        Assert.assertEquals(LATE_STREAM, forClass.getValue());
        Assert.assertEquals(Collections.singletonList(tuple), this.anchorCaptor.getValue());
        Assert.assertEquals(new Values(new Object[]{tuple}), this.valuesCaptor.getValue());
    }

    @Test
    public void testActivation() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockWaterMarkEventGenerator.track((GlobalStreamId) Mockito.any(), Mockito.anyLong()))).thenReturn(true);
        this.executor.initState((State) null);
        this.executor.waterMarkEventGenerator = this.mockWaterMarkEventGenerator;
        List<Tuple> mockTuples = getMockTuples(5L);
        mockTuples.forEach(tuple -> {
            this.executor.execute(tuple);
        });
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(WINDOW_EVENT_COUNT))).ack((Tuple) this.tupleCaptor.capture());
        Assert.assertArrayEquals(mockTuples.toArray(), this.tupleCaptor.getAllValues().toArray());
        ((IStatefulWindowedBolt) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.storm.topology.PersistentWindowedBoltExecutorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m12answer(InvocationOnMock invocationOnMock) throws Throwable {
                TupleWindow tupleWindow = (TupleWindow) invocationOnMock.getArguments()[0];
                Assert.assertEquals(5L, tupleWindow.get().size());
                Assert.assertEquals(5L, tupleWindow.get().size());
                Assert.assertEquals(5L, tupleWindow.get().size());
                return null;
            }
        }).when(this.mockBolt)).execute((TupleWindow) Mockito.any());
        this.executor.getWindowManager().add(new WaterMarkEvent(this.tupleTs + 1000));
        this.executor.prePrepare(0L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ((KeyValueState) Mockito.verify(this.mockPartitionState, Mockito.times(1))).put(forClass.capture(), this.partitionValuesCaptor.capture());
        Assert.assertEquals(PARTITION_KEY, forClass.getValue());
        List singletonList = Collections.singletonList(0L);
        Assert.assertThat(this.partitionValuesCaptor.getValue(), Matchers.contains(singletonList.toArray(new Long[0])));
        ((KeyValueState) Mockito.verify(this.mockWindowState, Mockito.times(1))).put(this.longCaptor.capture(), this.windowValuesCaptor.capture());
        Assert.assertEquals(((Long) singletonList.get(0)).longValue(), ((Long) this.longCaptor.getValue()).longValue());
        Assert.assertEquals(5L, ((WindowState.WindowPartition) this.windowValuesCaptor.getValue()).size());
        Assert.assertArrayEquals(mockTuples.toArray(), ((List) ((WindowState.WindowPartition) this.windowValuesCaptor.getValue()).getEvents().stream().map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList())).toArray());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ((KeyValueState) Mockito.verify(this.mockSystemState, Mockito.times(2))).put(forClass2.capture(), this.systemValuesCaptor.capture());
        Assert.assertEquals(EVICTION_STATE_KEY, forClass2.getAllValues().get(0));
        Assert.assertEquals(Optional.of(Pair.of(5L, 5L)), this.systemValuesCaptor.getAllValues().get(0));
        Assert.assertEquals(TRIGGER_STATE_KEY, forClass2.getAllValues().get(1));
        Assert.assertEquals(Optional.of(Long.valueOf(this.tupleTs)), this.systemValuesCaptor.getAllValues().get(1));
    }

    @Test
    public void testCacheEviction() {
        Mockito.when(Boolean.valueOf(this.mockWaterMarkEventGenerator.track((GlobalStreamId) Mockito.any(), Mockito.anyLong()))).thenReturn(true);
        this.executor.initState((State) null);
        this.executor.waterMarkEventGenerator = this.mockWaterMarkEventGenerator;
        getMockTuples(20000).forEach(tuple -> {
            this.executor.execute(tuple);
        });
        int i = 20000 / 1000;
        ((KeyValueState) Mockito.verify(this.mockWindowState, Mockito.times(i - 10))).put(this.longCaptor.capture(), this.windowValuesCaptor.capture());
        Assert.assertEquals(r0 * 1000, this.windowValuesCaptor.getAllValues().stream().mapToInt(windowPartition -> {
            return windowPartition.size();
        }).sum());
        final HashMap hashMap = new HashMap();
        this.windowValuesCaptor.getAllValues().forEach(windowPartition2 -> {
        });
        ((KeyValueState) Mockito.verify(this.mockPartitionState, Mockito.times(i))).put(ArgumentCaptor.forClass(String.class).capture(), this.partitionValuesCaptor.capture());
        Assert.assertThat(this.partitionValuesCaptor.getAllValues().get(i - 1), Matchers.contains(((List) LongStream.range(0L, i).boxed().collect(Collectors.toList())).toArray(new Long[0])));
        Mockito.when(this.mockWindowState.get(Mockito.any(), Mockito.any())).then(new Answer<Object>() { // from class: org.apache.storm.topology.PersistentWindowedBoltExecutorTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                WindowState.WindowPartition windowPartition3 = (WindowState.WindowPartition) hashMap.get((Long) invocationOnMock.getArgument(0));
                return windowPartition3 != null ? windowPartition3 : invocationOnMock.getArgument(1);
            }
        });
        ((KeyValueState) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.storm.topology.PersistentWindowedBoltExecutorTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m13answer(InvocationOnMock invocationOnMock) throws Throwable {
                Object[] arguments = invocationOnMock.getArguments();
                hashMap.put(Long.valueOf(((Long) arguments[0]).longValue()), (WindowState.WindowPartition) arguments[1]);
                return null;
            }
        }).when(this.mockWindowState)).put(Mockito.any(), Mockito.any());
        this.executor.getWindowManager().add(new WaterMarkEvent(this.tupleTs + 1000));
        ((IStatefulWindowedBolt) Mockito.verify(this.mockBolt, Mockito.times(20000 / WINDOW_EVENT_COUNT))).execute((TupleWindow) Mockito.any());
    }

    @Test
    public void testRollbackBeforeInit() throws Exception {
        this.executor.preRollback();
        ((IStatefulWindowedBolt) Mockito.verify(this.mockBolt, Mockito.times(1))).preRollback();
        ArgumentCaptor.forClass(String.class);
        ((KeyValueState) Mockito.verify(this.mockPartitionState, Mockito.times(1))).rollback();
        ((KeyValueState) Mockito.verify(this.mockWindowState, Mockito.times(1))).rollback();
        ((KeyValueState) Mockito.verify(this.mockSystemState, Mockito.times(1))).rollback();
    }

    @Test
    public void testRollbackAfterInit() throws Exception {
        this.executor.initState((State) null);
        this.executor.prePrepare(0L);
        this.executor.preRollback();
        ((IStatefulWindowedBolt) Mockito.verify(this.mockBolt, Mockito.times(1))).preRollback();
        ((KeyValueState) Mockito.verify(this.mockPartitionState, Mockito.times(1))).rollback();
        ((KeyValueState) Mockito.verify(this.mockPartitionState, Mockito.times(2))).put(ArgumentCaptor.forClass(String.class).capture(), this.partitionValuesCaptor.capture());
        ((KeyValueState) Mockito.verify(this.mockWindowState, Mockito.times(1))).rollback();
        ((KeyValueState) Mockito.verify(this.mockSystemState, Mockito.times(1))).rollback();
        ((KeyValueState) Mockito.verify(this.mockSystemState, Mockito.times(2))).iterator();
    }

    private List<Tuple> getMockTuples(long j) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < j; i++) {
            arrayList.add(Mockito.mock(Tuple.class));
        }
        return arrayList;
    }
}
