package org.apache.storm.executor;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.daemon.worker.WorkerTransfer;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.serialization.ITupleSerializer;
import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.testing.TestWordCounter;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.FieldSetter;

/* loaded from: input_file:org/apache/storm/executor/ExecutorTransferMultiThreadingTest.class */
public class ExecutorTransferMultiThreadingTest {
    private WorkerState workerState;
    private Map<String, Object> topoConf;
    private JCQueue transferQueue;
    private GeneralTopologyContext generalTopologyContext;
    private int selfTaskId = 1;
    private String sourceComp = "1";
    private int remoteTaskId = 2;
    private String destComp = "2";
    private static String value1 = "string-value";
    private static int value2 = 1234;

    /* loaded from: input_file:org/apache/storm/executor/ExecutorTransferMultiThreadingTest$SingleThreadedConsumer.class */
    private static class SingleThreadedConsumer implements JCQueue.Consumer {
        KryoTupleDeserializer deserializer;
        int numMessages;
        int msgCount = 0;

        public SingleThreadedConsumer(KryoTupleDeserializer kryoTupleDeserializer, int i) {
            this.deserializer = kryoTupleDeserializer;
            this.numMessages = i;
        }

        public void accept(Object obj) {
            TupleImpl deserialize = this.deserializer.deserialize(((TaskMessage) obj).message());
            Assert.assertEquals(deserialize.getValue(0), ExecutorTransferMultiThreadingTest.value1);
            Assert.assertEquals(deserialize.getValue(1), Integer.valueOf(ExecutorTransferMultiThreadingTest.value2));
            this.msgCount++;
        }

        public void finalCheck() {
            Assert.assertEquals(this.numMessages, this.msgCount);
        }

        public void flush() {
        }
    }

    @Before
    public void setup() throws NoSuchFieldException {
        this.topoConf = Utils.readStormConfig();
        StormTopology createStormTopology = createStormTopology();
        WorkerTopologyContext workerTopologyContext = (WorkerTopologyContext) Mockito.mock(WorkerTopologyContext.class);
        Mockito.when(workerTopologyContext.getRawTopology()).thenReturn(createStormTopology);
        Mockito.when(workerTopologyContext.getComponentId(this.selfTaskId)).thenReturn(this.sourceComp);
        Mockito.when(workerTopologyContext.getComponentId(this.remoteTaskId)).thenReturn(this.destComp);
        this.workerState = (WorkerState) Mockito.mock(WorkerState.class);
        Mockito.when(this.workerState.getWorkerTopologyContext()).thenReturn(workerTopologyContext);
        HashMap hashMap = new HashMap();
        hashMap.put(Integer.valueOf(this.selfTaskId), Mockito.mock(JCQueue.class));
        Mockito.when(this.workerState.getLocalReceiveQueues()).thenReturn(hashMap);
        Mockito.when(this.workerState.getTopologyId()).thenReturn("multi-threaded-topo-test");
        Mockito.when(Integer.valueOf(this.workerState.getPort())).thenReturn(6701);
        Mockito.when(this.workerState.getMetricRegistry()).thenReturn(new StormMetricRegistry());
        Mockito.when(Boolean.valueOf(this.workerState.tryTransferRemote((AddressedTuple) ArgumentMatchers.any(), (Queue) ArgumentMatchers.any(), (ITupleSerializer) ArgumentMatchers.any()))).thenCallRealMethod();
        this.transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100L), this.workerState.getTopologyId(), "__system", -100, this.workerState.getPort(), this.workerState.getMetricRegistry());
        WorkerTransfer workerTransfer = new WorkerTransfer(this.workerState, this.topoConf, 2);
        FieldSetter.setField(workerTransfer, workerTransfer.getClass().getDeclaredField("transferQueue"), this.transferQueue);
        FieldSetter.setField(this.workerState, this.workerState.getClass().getDeclaredField("workerTransfer"), workerTransfer);
        this.generalTopologyContext = (GeneralTopologyContext) Mockito.mock(GeneralTopologyContext.class);
    }

    @Test
    public void testExecutorTransfer() throws InterruptedException {
        ExecutorTransfer executorTransfer = new ExecutorTransfer(this.workerState, this.topoConf);
        executorTransfer.initLocalRecvQueues();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        Runnable[] runnableArr = new Runnable[10];
        for (int i = 0; i < 10; i++) {
            runnableArr[i] = createProducerTask(executorTransfer);
        }
        for (Runnable runnable : runnableArr) {
            newFixedThreadPool.submit(runnable);
        }
        newFixedThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        SingleThreadedConsumer singleThreadedConsumer = new SingleThreadedConsumer(new KryoTupleDeserializer(this.topoConf, this.workerState.getWorkerTopologyContext()), 10);
        this.transferQueue.consume(singleThreadedConsumer);
        singleThreadedConsumer.finalCheck();
        newFixedThreadPool.shutdown();
    }

    private Runnable createProducerTask(final ExecutorTransfer executorTransfer) {
        return new Runnable() { // from class: org.apache.storm.executor.ExecutorTransferMultiThreadingTest.1
            Tuple tuple;
            AddressedTuple addressedTuple;

            {
                this.tuple = new TupleImpl(ExecutorTransferMultiThreadingTest.this.generalTopologyContext, new Values(new Object[]{ExecutorTransferMultiThreadingTest.value1, Integer.valueOf(ExecutorTransferMultiThreadingTest.value2)}), ExecutorTransferMultiThreadingTest.this.sourceComp, ExecutorTransferMultiThreadingTest.this.selfTaskId, "default");
                this.addressedTuple = new AddressedTuple(ExecutorTransferMultiThreadingTest.this.remoteTaskId, this.tuple);
            }

            @Override // java.lang.Runnable
            public void run() {
                executorTransfer.tryTransfer(this.addressedTuple, (Queue) null);
            }
        };
    }

    private StormTopology createStormTopology() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout(this.sourceComp, new TestWordSpout(true), 1);
        topologyBuilder.setBolt(this.destComp, new TestWordCounter(), 1).fieldsGrouping(this.sourceComp, new Fields(new String[]{"word"}));
        return topologyBuilder.createTopology();
    }
}
