package org.apache.storm.streams;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.streams.operations.aggregators.LongSum;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.FilterProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/streams/ProcessorBoltTest.class */
public class ProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    ProcessorBolt bolt;
    Tuple mockTuple1;
    Tuple mockTuple2;
    Tuple mockTuple3;
    Tuple punctuation;
    Multimap<String, ProcessorNode> mockStreamToProcessors;
    DirectedGraph<Node, Edge> graph;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple) Mockito.mock(Tuple.class);
        this.mockTuple2 = (Tuple) Mockito.mock(Tuple.class);
        this.mockTuple3 = (Tuple) Mockito.mock(Tuple.class);
        setUpMockTuples(this.mockTuple1, this.mockTuple2, this.mockTuple3);
        this.punctuation = (Tuple) Mockito.mock(Tuple.class);
        setUpPunctuation(this.punctuation);
        this.mockStreamToProcessors = (Multimap) Mockito.mock(Multimap.class);
        this.graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
    }

    @Test
    public void testEmitAndAck() throws Exception {
        setUpProcessorBolt(new FilterProcessor(num -> {
            return true;
        }));
        this.bolt.execute(this.mockTuple1);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).emit((String) forClass3.capture(), (Collection) forClass.capture(), (List) forClass2.capture());
        Assert.assertEquals("outputstream", forClass3.getValue());
        Assert.assertArrayEquals(new Object[]{this.mockTuple1}, ((Collection) forClass.getValue()).toArray());
        Assert.assertEquals(new Values(new Object[]{100}), forClass2.getValue());
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).ack(this.mockTuple1);
    }

    @Test
    public void testAggResultAndAck() throws Exception {
        setUpProcessorBolt(new AggregateProcessor(new LongSum()), Collections.singleton("inputstream"), true, null);
        this.bolt.execute(this.mockTuple2);
        this.bolt.execute(this.mockTuple3);
        this.bolt.execute(this.punctuation);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(2))).emit((String) forClass3.capture(), (Collection) forClass.capture(), (List) forClass2.capture());
        Assert.assertArrayEquals(new Object[]{this.mockTuple2, this.mockTuple3, this.punctuation}, ((Collection) forClass.getAllValues().get(0)).toArray());
        Assert.assertArrayEquals(new Object[]{this.mockTuple2, this.mockTuple3, this.punctuation}, ((Collection) forClass.getAllValues().get(1)).toArray());
        Assert.assertArrayEquals(new Object[]{new Values(new Object[]{200L}), new Values(new Object[]{"__punctuation"})}, forClass2.getAllValues().toArray());
        Assert.assertArrayEquals(new Object[]{"outputstream", "outputstream__punctuation"}, forClass3.getAllValues().toArray());
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).ack(this.mockTuple2);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).ack(this.mockTuple3);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).ack(this.punctuation);
    }

    @Test
    public void testEmitTs() throws Exception {
        Tuple tuple = (Tuple) Mockito.mock(Tuple.class);
        setUpMockTuples(tuple);
        Mockito.when(tuple.getLongByField("ts")).thenReturn(12345L);
        setUpProcessorBolt(new FilterProcessor(obj -> {
            return true;
        }), "ts");
        this.bolt.execute(tuple);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).emit((String) forClass3.capture(), (Collection) forClass.capture(), (List) forClass2.capture());
        Assert.assertEquals("outputstream", forClass3.getValue());
        Assert.assertArrayEquals(new Object[]{tuple}, ((Collection) forClass.getValue()).toArray());
        Assert.assertEquals(new Values(new Object[]{100, 12345L}), forClass2.getValue());
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).ack(tuple);
    }

    private void setUpProcessorBolt(Processor<?> processor) {
        setUpProcessorBolt(processor, Collections.emptySet(), false, null);
    }

    private void setUpProcessorBolt(Processor<?> processor, String str) {
        setUpProcessorBolt(processor, Collections.emptySet(), false, str);
    }

    private void setUpProcessorBolt(Processor<?> processor, Set<String> set, boolean z, String str) {
        ProcessorNode processorNode = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        processorNode.setWindowedParentStreams(set);
        processorNode.setWindowed(z);
        Mockito.when(this.mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(processorNode));
        Mockito.when(this.mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
        Map map = (Map) Mockito.mock(Map.class);
        GlobalStreamId globalStreamId = (GlobalStreamId) Mockito.mock(GlobalStreamId.class);
        Mockito.when(this.mockTopologyContext.getThisSources()).thenReturn(map);
        Mockito.when(map.keySet()).thenReturn(Collections.singleton(globalStreamId));
        Mockito.when(globalStreamId.get_streamId()).thenReturn("inputstream");
        Mockito.when(globalStreamId.get_componentId()).thenReturn("bolt0");
        Mockito.when(this.mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
        this.graph.addVertex(processorNode);
        this.bolt = new ProcessorBolt("bolt1", this.graph, Collections.singletonList(processorNode));
        if (str != null && !str.isEmpty()) {
            this.bolt.setTimestampField(str);
        }
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
    }

    private void setUpMockTuples(Tuple... tupleArr) {
        for (Tuple tuple : tupleArr) {
            Mockito.when(Integer.valueOf(tuple.size())).thenReturn(1);
            Mockito.when(tuple.getValue(0)).thenReturn(100);
            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
        }
    }

    private void setUpPunctuation(Tuple tuple) {
        Mockito.when(Integer.valueOf(tuple.size())).thenReturn(1);
        Mockito.when(tuple.getValue(0)).thenReturn("__punctuation");
        Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
        Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1807685350:
                if (implMethodName.equals("lambda$testEmitAndAck$48a57afa$1")) {
                    z = false;
                    break;
                }
                break;
            case 528890541:
                if (implMethodName.equals("lambda$testEmitTs$48a57afa$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/storm/streams/operations/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/storm/streams/ProcessorBoltTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Z")) {
                    return num -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/storm/streams/operations/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/storm/streams/ProcessorBoltTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return obj -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
