package org.apache.apex.malhar.lib.join;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.google.common.collect.Maps;
import java.util.Date;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.class */
public class POJOPartitionJoinOperatorTest {
    public static final int NUM_OF_PARTITIONS = 4;
    public static final int TOTAL_TUPLES_PROCESS = 1000;
    private static boolean testFailed = false;

    /* loaded from: input_file:org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest$JoinApp.class */
    public static class JoinApp implements StreamingApplication {
        public PartitionTestJoinOperator joinOp;

        public void populateDAG(DAG dag, Configuration configuration) {
            TestGenerator addOperator = dag.addOperator("Generator1", new TestGenerator());
            TestGenerator addOperator2 = dag.addOperator("Generator2", new TestGenerator());
            this.joinOp = dag.addOperator("Join", new PartitionTestJoinOperator());
            this.joinOp.setLeftKeyExpression("id");
            this.joinOp.setRightKeyExpression("id");
            this.joinOp.setIncludeFieldStr("id,eventTime;id,eventTime");
            this.joinOp.setExpiryTime(10000L);
            ConsoleOutputOperator addOperator3 = dag.addOperator("Console", new ConsoleOutputOperator());
            dag.addStream("Gen1ToJoin", addOperator.output, this.joinOp.input1);
            dag.addStream("Gen2ToJoin", addOperator2.output, this.joinOp.input2);
            dag.addStream("JoinToConsole", this.joinOp.outputPort, addOperator3.input);
            dag.setInputPortAttribute(this.joinOp.input1, DAG.InputPortMeta.TUPLE_CLASS, TestEvent.class);
            dag.setInputPortAttribute(this.joinOp.input2, DAG.InputPortMeta.TUPLE_CLASS, TestEvent.class);
            dag.setOutputPortAttribute(this.joinOp.outputPort, DAG.InputPortMeta.TUPLE_CLASS, TestEvent.class);
            dag.setAttribute(this.joinOp, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(4));
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest$PartitionTestJoinOperator.class */
    public static class PartitionTestJoinOperator extends POJOInnerJoinOperator implements StatsListener {
        public int operatorId;
        HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
        transient CountDownLatch latch = new CountDownLatch(1);
        int tuplesProcessed = 0;

        @AutoMetric
        int tuplesProcessedCompletely = 0;

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.operatorId = operatorContext.getId();
        }

        protected void processTuple(Object obj, boolean z) {
            if (z) {
                int intValue = ((Integer) extractKey(obj, z)).intValue();
                if (!this.partitionMap.containsKey(Integer.valueOf(intValue))) {
                    this.partitionMap.put(Integer.valueOf(intValue), Integer.valueOf(this.operatorId));
                } else if (this.partitionMap.get(Integer.valueOf(intValue)).intValue() != this.operatorId) {
                    boolean unused = POJOPartitionJoinOperatorTest.testFailed = true;
                }
                this.tuplesProcessed++;
            }
        }

        public void endWindow() {
            super.endWindow();
            this.tuplesProcessedCompletely = this.tuplesProcessed;
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.tuplesProcessedCompletely = ((Integer) ((Stats.OperatorStats) batchedOperatorStats.getLastWindowedStats().get(batchedOperatorStats.getLastWindowedStats().size() - 1)).metrics.get("tuplesProcessedCompletely")).intValue();
            if (this.tuplesProcessedCompletely < 1000) {
                return null;
            }
            this.latch.countDown();
            return null;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest$TestEvent.class */
    public static class TestEvent {
        public int id;
        public Date eventTime;

        public int getId() {
            return this.id;
        }

        public void setId(int i) {
            this.id = i;
        }

        public Date getEventTime() {
            return this.eventTime;
        }

        public void setEventTime(Date date) {
            this.eventTime = date;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest$TestGenerator.class */
    public static class TestGenerator extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
        private final transient Random r = new Random();

        public void emitTuples() {
            TestEvent testEvent = new TestEvent();
            testEvent.id = this.r.nextInt(100);
            this.output.emit(testEvent);
        }
    }

    @Test
    public void testJoinOpStreamCodec() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        JoinApp joinApp = new JoinApp();
        newInstance.prepareDAG(joinApp, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        joinApp.joinOp.latch.await();
        controller.shutdown();
        Assert.assertFalse(testFailed);
    }
}
