package org.apache.storm.streams;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.streams.operations.aggregators.Count;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/streams/WindowedProcessorBoltTest.class */
public class WindowedProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    WindowedProcessorBolt bolt;
    Tuple mockTuple1;
    Tuple mockTuple2;
    Tuple mockTuple3;
    DirectedGraph<Node, Edge> graph;
    Multimap<String, ProcessorNode> mockStreamToProcessors;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple) Mockito.mock(Tuple.class);
        this.mockTuple2 = (Tuple) Mockito.mock(Tuple.class);
        this.mockTuple3 = (Tuple) Mockito.mock(Tuple.class);
        setUpMockTuples(this.mockTuple1, this.mockTuple2, this.mockTuple3);
        this.mockStreamToProcessors = (Multimap) Mockito.mock(Multimap.class);
    }

    @Test
    public void testEmit() throws Exception {
        setUpWindowedProcessorBolt(new AggregateProcessor(new Count()), TumblingWindows.of(BaseWindowedBolt.Count.of(2)));
        this.bolt.execute(getMockTupleWindow(this.mockTuple1, this.mockTuple2, this.mockTuple3));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(2))).emit((String) forClass2.capture(), (List) forClass.capture());
        Assert.assertEquals("outputstream", forClass2.getAllValues().get(0));
        Assert.assertEquals(new Values(new Object[]{3L}), forClass.getAllValues().get(0));
        Assert.assertEquals("outputstream__punctuation", forClass2.getAllValues().get(1));
        Assert.assertEquals(new Values(new Object[]{"__punctuation"}), forClass.getAllValues().get(1));
    }

    private void setUpWindowedProcessorBolt(Processor<?> processor, Window<?, ?> window) {
        ProcessorNode processorNode = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        processorNode.setWindowed(true);
        Mockito.when(this.mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(processorNode));
        Mockito.when(this.mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
        this.graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
        this.graph.addVertex(processorNode);
        this.bolt = new WindowedProcessorBolt("bolt1", this.graph, Collections.singletonList(processorNode), window);
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
    }

    private void setUpMockTuples(Tuple... tupleArr) {
        for (Tuple tuple : tupleArr) {
            Mockito.when(Integer.valueOf(tuple.size())).thenReturn(1);
            Mockito.when(tuple.getValue(0)).thenReturn(100);
            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
        }
    }

    private TupleWindow getMockTupleWindow(Tuple... tupleArr) {
        TupleWindow tupleWindow = (TupleWindow) Mockito.mock(TupleWindow.class);
        Mockito.when(tupleWindow.get()).thenReturn(Arrays.asList(tupleArr));
        return tupleWindow;
    }
}
