package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/PartitioningTest.class */
public class PartitioningTest {
    private static final Logger LOG = LoggerFactory.getLogger(PartitioningTest.class);
    private static final File TEST_OUTPUT_DIR = new File("target", PartitioningTest.class.getName());

    /* loaded from: input_file:com/datatorrent/stram/PartitioningTest$CollectorOperator.class */
    public static class CollectorOperator extends BaseOperator {
        public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
        private transient int operatorId;
        public String prefix = "";
        public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.PartitioningTest.CollectorOperator.1
            public void process(Object obj) {
                PartitioningTest.LOG.debug("===received tuple {}===", obj);
                Assert.assertNotNull(Integer.valueOf(CollectorOperator.this.operatorId));
                String str = CollectorOperator.this.prefix + CollectorOperator.this.operatorId;
                synchronized (CollectorOperator.receivedTuples) {
                    List<Object> list = CollectorOperator.receivedTuples.get(str);
                    if (list == null) {
                        list = Collections.synchronizedList(new ArrayList());
                        CollectorOperator.receivedTuples.put(str, list);
                    }
                    list.add(obj);
                }
                if (CollectorOperator.this.output.isConnected()) {
                    CollectorOperator.this.output.emit(obj);
                }
            }
        };

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();

        public void setup(Context.OperatorContext operatorContext) {
            this.operatorId = operatorContext.getId();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/PartitioningTest$PartitionLoadWatch.class */
    public static class PartitionLoadWatch implements StatsListener, Serializable {
        private static final long serialVersionUID = 1;
        private static final ThreadLocal<Map<Integer, Integer>> loadIndicators = new ThreadLocal<>();

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            Integer num;
            StatsListener.Response response = null;
            Map<Integer, Integer> map = loadIndicators.get();
            if (map != null && (num = map.get(Integer.valueOf(batchedOperatorStats.getOperatorId()))) != null) {
                response = new StatsListener.Response();
                response.repartitionRequired = true;
                response.loadIndicator = num.intValue();
            }
            return response;
        }

        public static void put(PTOperator pTOperator, int i) {
            Map<Integer, Integer> map = loadIndicators.get();
            if (map == null) {
                ThreadLocal<Map<Integer, Integer>> threadLocal = loadIndicators;
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                map = concurrentHashMap;
                threadLocal.set(concurrentHashMap);
            }
            map.put(Integer.valueOf(pTOperator.getId()), Integer.valueOf(i));
        }

        public static void remove(PTOperator pTOperator) {
            loadIndicators.get().remove(Integer.valueOf(pTOperator.getId()));
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/PartitioningTest$PartitionableInputOperator.class */
    public static class PartitionableInputOperator extends BaseOperator implements InputOperator, Partitioner<PartitionableInputOperator> {
        String partitionProperty = "partition";

        public void emitTuples() {
        }

        public Collection<Partitioner.Partition<PartitionableInputOperator>> definePartitions(Collection<Partitioner.Partition<PartitionableInputOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList arrayList = new ArrayList(3);
            Iterator<Partitioner.Partition<PartitionableInputOperator>> it = collection.iterator();
            for (int i = 0; i < 3; i++) {
                PartitionableInputOperator partitionableInputOperator = new PartitionableInputOperator();
                if (it.hasNext()) {
                    partitionableInputOperator.partitionProperty = ((PartitionableInputOperator) it.next().getPartitionedInstance()).partitionProperty;
                }
                partitionableInputOperator.partitionProperty += "_" + i;
                arrayList.add(new DefaultPartition(partitionableInputOperator));
            }
            return arrayList;
        }

        public void partitioned(Map<Integer, Partitioner.Partition<PartitionableInputOperator>> map) {
        }

        private void testInputOperatorPartitioning(LogicalPlan logicalPlan) throws Exception {
            File file = new File(PartitioningTest.TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
            logicalPlan.getAttributes().put(LogicalPlan.APPLICATION_PATH, file.getPath());
            logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(file.getPath(), (Configuration) null));
            PartitionableInputOperator addOperator = logicalPlan.addOperator("input", new PartitionableInputOperator());
            logicalPlan.setOperatorAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitionLoadWatch()));
            StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
            stramLocalCluster.setHeartbeatMonitoringEnabled(false);
            stramLocalCluster.runAsync();
            List<PTOperator> assertNumberPartitions = PartitioningTest.assertNumberPartitions(3, stramLocalCluster, logicalPlan.getMeta(addOperator));
            HashSet hashSet = new HashSet();
            for (PTOperator pTOperator : assertNumberPartitions) {
                Map nodes = StramTestSupport.waitForActivation(stramLocalCluster, pTOperator).getNodes();
                Assert.assertEquals("number operators " + nodes, 1L, nodes.size());
                PartitionableInputOperator operator = ((Node) nodes.get(Integer.valueOf(pTOperator.getId()))).getOperator();
                Assert.assertNotNull(operator);
                hashSet.add(operator.partitionProperty);
                Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
                pTOperator.checkpoints.add(checkpoint);
                pTOperator.setRecoveryCheckpoint(checkpoint);
                AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(file.getPath(), (Configuration) null);
                asyncFSStorageAgent.save(operator, pTOperator.getId(), 10L);
                asyncFSStorageAgent.copyToHDFS(pTOperator.getId(), 10L);
            }
            Assert.assertEquals("", Sets.newHashSet(new String[]{"partition_0", "partition_1", "partition_2"}), hashSet);
            PartitionLoadWatch.put((PTOperator) assertNumberPartitions.get(0), 1);
            long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i == 0 && currentTimeMillis > System.currentTimeMillis() - StramTestSupport.DEFAULT_TIMEOUT_MILLIS; i += stramLocalCluster.dnmgr.processEvents()) {
            }
            PartitionLoadWatch.remove((PTOperator) assertNumberPartitions.get(0));
            List<PTOperator> assertNumberPartitions2 = PartitioningTest.assertNumberPartitions(3, stramLocalCluster, logicalPlan.getMeta(addOperator));
            HashSet hashSet2 = new HashSet();
            for (PTOperator pTOperator2 : assertNumberPartitions2) {
                Map nodes2 = StramTestSupport.waitForActivation(stramLocalCluster, pTOperator2).getNodes();
                Assert.assertEquals("number operators " + nodes2, 1L, nodes2.size());
                PartitionableInputOperator operator2 = ((Node) nodes2.get(Integer.valueOf(pTOperator2.getId()))).getOperator();
                Assert.assertNotNull(operator2);
                hashSet2.add(operator2.partitionProperty);
            }
            Assert.assertEquals("", Sets.newHashSet(new String[]{"partition_0_0", "partition_1_1", "partition_2_2"}), hashSet2);
            stramLocalCluster.shutdown();
        }

        @Test
        public void testInputOperatorPartitioningWithAsyncStorageAgent() throws Exception {
            testInputOperatorPartitioning(new LogicalPlan());
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/PartitioningTest$TestInputOperator.class */
    public static class TestInputOperator<T> extends BaseOperator implements InputOperator {
        transient boolean first;
        transient long windowId;
        public List<List<T>> testTuples;
        public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
        boolean blockEndStream = false;

        public void emitTuples() {
            if (this.testTuples == null || this.testTuples.isEmpty()) {
                if (this.blockEndStream) {
                    return;
                } else {
                    BaseOperator.shutdown();
                }
            }
            if (this.first) {
                for (T t : this.testTuples.remove(0)) {
                    this.output.emit(t);
                    PartitioningTest.LOG.debug("sent tuple ==={}===", t);
                }
                this.first = false;
            }
        }

        public void beginWindow(long j) {
            this.windowId = j;
            this.first = true;
        }
    }

    @Before
    public void setup() throws IOException {
        StreamingContainer.eventloop.start();
    }

    @After
    public void teardown() {
        StreamingContainer.eventloop.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testDefaultPartitioning() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File(TEST_OUTPUT_DIR, "testDefaultPartitioning").getPath(), (Configuration) null));
        Integer[] numArr = {new Integer[]{4, 5}};
        CollectorOperator.receivedTuples.clear();
        TestInputOperator addOperator = logicalPlan.addOperator("input", new TestInputOperator());
        addOperator.testTuples = new ArrayList();
        for (Object[] objArr : numArr) {
            addOperator.testTuples.add(new ArrayList(Arrays.asList(objArr)));
        }
        CollectorOperator addOperator2 = logicalPlan.addOperator("collector", new CollectorOperator());
        addOperator2.prefix = "" + System.identityHashCode(addOperator2);
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.addStream("fromInput", addOperator.output, addOperator2.input);
        CollectorOperator addOperator3 = logicalPlan.addOperator("merged", new CollectorOperator());
        addOperator3.prefix = "" + System.identityHashCode(addOperator3);
        logicalPlan.addStream("toMerged", addOperator2.output, addOperator3.input);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.run();
        List planOperators = stramLocalCluster.getPlanOperators(logicalPlan.getMeta(addOperator2));
        Assert.assertEquals("number operator instances " + planOperators, 2L, planOperators.size());
        Assert.assertEquals("received tuples " + CollectorOperator.receivedTuples, 3L, CollectorOperator.receivedTuples.size());
        Assert.assertEquals("received tuples " + planOperators.get(1), Arrays.asList(5), CollectorOperator.receivedTuples.get(addOperator2.prefix + ((PTOperator) planOperators.get(1)).getId()));
        PTOperator findByLogicalNode = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator3));
        List<Object> list = CollectorOperator.receivedTuples.get(addOperator3.prefix + findByLogicalNode.getId());
        Assert.assertNotNull("merged tuples " + findByLogicalNode, list);
        Assert.assertEquals("merged tuples " + findByLogicalNode, Sets.newHashSet(numArr[0]), Sets.newHashSet(list));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<PTOperator> assertNumberPartitions(final int i, final StramLocalCluster stramLocalCluster, final LogicalPlan.OperatorMeta operatorMeta) throws Exception {
        StramTestSupport.WaitCondition waitCondition = new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.PartitioningTest.1
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                List planOperators = stramLocalCluster.getPlanOperators(operatorMeta);
                PartitioningTest.LOG.debug("Number of operators {}, expected number {}", Integer.valueOf(planOperators.size()), Integer.valueOf(i));
                return planOperators.size() == i;
            }
        };
        StramTestSupport.awaitCompletion(waitCondition, 10000L);
        Assert.assertTrue("Number partitions match " + operatorMeta, waitCondition.isComplete());
        return stramLocalCluster.getPlanOperators(operatorMeta);
    }

    @Test
    public void testDynamicDefaultPartitioning() throws Exception {
        List<Object> list;
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning").getPath(), (Configuration) null));
        CollectorOperator.receivedTuples.clear();
        TestInputOperator addOperator = logicalPlan.addOperator("input", new TestInputOperator());
        addOperator.blockEndStream = true;
        CollectorOperator addOperator2 = logicalPlan.addOperator("partitionedCollector", new CollectorOperator());
        addOperator2.prefix = "" + System.identityHashCode(addOperator2);
        logicalPlan.setOperatorAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setOperatorAttribute(addOperator2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitionLoadWatch()));
        logicalPlan.addStream("fromInput", addOperator.output, addOperator2.input);
        CollectorOperator addOperator3 = logicalPlan.addOperator("singleCollector", new CollectorOperator());
        addOperator3.prefix = "" + System.identityHashCode(addOperator3);
        logicalPlan.addStream("toSingleCollector", addOperator2.output, addOperator3.input);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.runAsync();
        List<PTOperator> assertNumberPartitions = assertNumberPartitions(2, stramLocalCluster, logicalPlan.getMeta(addOperator2));
        HashSet newHashSet = Sets.newHashSet();
        Iterator<PTOperator> it = assertNumberPartitions.iterator();
        while (it.hasNext()) {
            newHashSet.add(it.next().getContainer());
        }
        Assert.assertTrue("Number of containers are 4", 4 == stramLocalCluster.dnmgr.getPhysicalPlan().getContainers().size());
        PTOperator pTOperator = assertNumberPartitions.get(0);
        PartitionLoadWatch.put(pTOperator, 1);
        LOG.debug("Triggered split for {}", pTOperator);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i == 0 && currentTimeMillis > System.currentTimeMillis() - StramTestSupport.DEFAULT_TIMEOUT_MILLIS; i += stramLocalCluster.dnmgr.processEvents()) {
            Thread.sleep(20L);
        }
        List<PTOperator> assertNumberPartitions2 = assertNumberPartitions(3, stramLocalCluster, logicalPlan.getMeta(addOperator2));
        Assert.assertTrue("container reused", stramLocalCluster.dnmgr.getPhysicalPlan().getContainers().containsAll(newHashSet));
        Iterator<PTOperator> it2 = assertNumberPartitions2.iterator();
        while (it2.hasNext()) {
            StramTestSupport.waitForActivation(stramLocalCluster, it2.next());
        }
        PartitionLoadWatch.remove(pTOperator);
        Iterator it3 = stramLocalCluster.dnmgr.getPhysicalPlan().getContainers().iterator();
        while (it3.hasNext()) {
            int i2 = 0;
            for (PTOperator pTOperator2 : ((PTContainer) it3.next()).getOperators()) {
                i2 = i2 + pTOperator2.getBufferServerMemory() + ((Integer) pTOperator2.getOperatorMeta().getValue(Context.OperatorContext.MEMORY_MB)).intValue();
            }
            Assert.assertEquals("memory", i2, r0.getRequiredMemoryMB());
        }
        PTOperator findByLogicalNode = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator));
        Map nodes = StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode).getNodes();
        Assert.assertEquals("number operators " + nodes, 1L, nodes.size());
        TestInputOperator operator = ((Node) nodes.get(Integer.valueOf(findByLogicalNode.getId()))).getOperator();
        Assert.assertNotNull("" + nodes, operator);
        ArrayList arrayList = new ArrayList();
        LOG.debug("Number of partitions {}", Integer.valueOf(assertNumberPartitions2.size()));
        for (PTOperator pTOperator3 : assertNumberPartitions2) {
            LOG.debug("Partition key map size: {}", Integer.valueOf(pTOperator3.getPartitionKeys().size()));
            arrayList.add(((Partitioner.PartitionKeys) pTOperator3.getPartitionKeys().values().iterator().next()).partitions.iterator().next());
        }
        operator.testTuples = Collections.synchronizedList(new ArrayList());
        operator.testTuples.add(arrayList);
        for (PTOperator pTOperator4 : assertNumberPartitions2) {
            Integer num = (Integer) ((Partitioner.PartitionKeys) pTOperator4.getPartitionKeys().values().iterator().next()).partitions.iterator().next();
            int i3 = 0;
            while (true) {
                list = CollectorOperator.receivedTuples.get(addOperator2.prefix + pTOperator4.getId());
                if (list == null || list.isEmpty()) {
                    int i4 = i3;
                    i3++;
                    if (i4 % 100 == 0) {
                        LOG.debug("Waiting for tuple: " + pTOperator4);
                    }
                    Thread.sleep(10L);
                }
            }
            Assert.assertEquals("received " + pTOperator4, Arrays.asList(num), list);
        }
        List planOperators = stramLocalCluster.getPlanOperators(logicalPlan.getMeta(addOperator3));
        Assert.assertEquals("number output operator instances " + planOperators, 1L, planOperators.size());
        StramTestSupport.waitForActivation(stramLocalCluster, (PTOperator) planOperators.get(0));
        while (true) {
            List<Object> list2 = CollectorOperator.receivedTuples.get(addOperator3.prefix + ((PTOperator) planOperators.get(0)).getId());
            if (list2 != null && list2.size() >= arrayList.size()) {
                Assert.assertEquals("output tuples " + list2, Sets.newHashSet(arrayList), Sets.newHashSet(list2));
                stramLocalCluster.shutdown();
                return;
            } else {
                LOG.debug("Waiting for tuple: " + planOperators.get(0) + " expected: " + arrayList + " received: " + list2);
                Thread.sleep(20L);
            }
        }
    }
}
