package com.datatorrent.lib.join;

import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.esotericsoftware.kryo.Kryo;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.class */
public class POJOTimeBasedJoinOperatorTest {

    @Rule
    public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();

    /* loaded from: input_file:com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest$CustOrder.class */
    public static class CustOrder {
        public int ID;
        public String Name;
        public int OID;
        public int Amount;

        public String toString() {
            return "{ID=" + this.ID + ", Name='" + this.Name + "', OID=" + this.OID + ", Amount=" + this.Amount + '}';
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest$Customer.class */
    public class Customer {
        public int ID;
        public String Name;

        public Customer() {
        }

        public Customer(int i, String str) {
            this.ID = i;
            this.Name = str;
        }

        public String toString() {
            return "Customer{ID=" + this.ID + ", Name='" + this.Name + "'}";
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest$Order.class */
    public class Order {
        public int OID;
        public int CID;
        public int Amount;

        public Order() {
        }

        public Order(int i, int i2, int i3) {
            this.OID = i;
            this.CID = i2;
            this.Amount = i3;
        }

        public String toString() {
            return "Order{OID=" + this.OID + ", CID=" + this.CID + ", Amount=" + this.Amount + '}';
        }
    }

    @Test
    public void testInnerJoinOperator() throws IOException, InterruptedException {
        Kryo kryo = new Kryo();
        POJOJoinOperator pOJOJoinOperator = new POJOJoinOperator();
        InMemoryStore inMemoryStore = new InMemoryStore(200L, 200);
        pOJOJoinOperator.setLeftStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setRightStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setIncludeFields("ID,Name;OID,Amount");
        pOJOJoinOperator.setKeyFields("ID,CID");
        pOJOJoinOperator.outputClass = CustOrder.class;
        pOJOJoinOperator.setup(MapTimeBasedJoinOperator.context);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        pOJOJoinOperator.outputPort.setSink(collectorTestSink);
        pOJOJoinOperator.beginWindow(0L);
        Customer customer = new Customer(1, "Anil");
        pOJOJoinOperator.input1.process(customer);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        pOJOJoinOperator.input2.process(new Order(102, 1, 300));
        pOJOJoinOperator.input2.process(new Order(103, 3, 300));
        pOJOJoinOperator.input2.process(new Order(104, 7, 300));
        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.endWindow();
        Assert.assertEquals("Number of tuple emitted ", 1L, collectorTestSink.collectedTuples.size());
        CustOrder custOrder = (CustOrder) ((List) collectorTestSink.collectedTuples.iterator().next()).get(0);
        Assert.assertEquals("value of ID :", customer.ID, custOrder.ID);
        Assert.assertEquals("value of Name :", customer.Name, custOrder.Name);
        Assert.assertEquals("value of OID: ", r0.OID, custOrder.OID);
        Assert.assertEquals("value of Amount: ", r0.Amount, custOrder.Amount);
    }

    @Test
    public void testLeftOuterJoinOperator() throws IOException, InterruptedException {
        Kryo kryo = new Kryo();
        POJOJoinOperator pOJOJoinOperator = new POJOJoinOperator();
        InMemoryStore inMemoryStore = new InMemoryStore(200L, 200);
        pOJOJoinOperator.setLeftStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setRightStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setIncludeFields("ID,Name;OID,Amount");
        pOJOJoinOperator.setKeyFields("ID,CID");
        pOJOJoinOperator.outputClass = CustOrder.class;
        pOJOJoinOperator.setStrategy("left_outer_join");
        pOJOJoinOperator.setup(MapTimeBasedJoinOperator.context);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        pOJOJoinOperator.outputPort.setSink(collectorTestSink);
        pOJOJoinOperator.beginWindow(0L);
        pOJOJoinOperator.input1.process(new Customer(1, "Anil"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        pOJOJoinOperator.input2.process(new Order(102, 3, 300));
        pOJOJoinOperator.input2.process(new Order(103, 7, 300));
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(1L);
        pOJOJoinOperator.input2.process(new Order(104, 5, 300));
        Customer customer = new Customer(5, "DT");
        pOJOJoinOperator.input1.process(customer);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(2L);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Number of tuple emitted ", 2L, collectorTestSink.collectedTuples.size());
        Iterator it = collectorTestSink.collectedTuples.iterator();
        CustOrder custOrder = (CustOrder) ((List) it.next()).get(0);
        Assert.assertEquals("value of ID :", customer.ID, custOrder.ID);
        Assert.assertEquals("value of Name :", customer.Name, custOrder.Name);
        Assert.assertEquals("value of OID: ", r0.OID, custOrder.OID);
        Assert.assertEquals("value of Amount: ", r0.Amount, custOrder.Amount);
        Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", ((CustOrder) ((List) it.next()).get(0)).toString());
    }

    @Test
    public void testRightOuterJoinOperator() throws IOException, InterruptedException {
        Kryo kryo = new Kryo();
        POJOJoinOperator pOJOJoinOperator = new POJOJoinOperator();
        InMemoryStore inMemoryStore = new InMemoryStore(200L, 200);
        pOJOJoinOperator.setLeftStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setRightStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setIncludeFields("ID,Name;OID,Amount");
        pOJOJoinOperator.setKeyFields("ID,CID");
        pOJOJoinOperator.outputClass = CustOrder.class;
        pOJOJoinOperator.setStrategy("right_outer_join");
        pOJOJoinOperator.setup(MapTimeBasedJoinOperator.context);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        pOJOJoinOperator.outputPort.setSink(collectorTestSink);
        pOJOJoinOperator.beginWindow(0L);
        pOJOJoinOperator.input1.process(new Customer(1, "Anil"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        pOJOJoinOperator.input2.process(new Order(102, 3, 300));
        pOJOJoinOperator.input2.process(new Order(103, 7, 300));
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(1L);
        pOJOJoinOperator.input2.process(new Order(104, 5, 300));
        Customer customer = new Customer(5, "DT");
        pOJOJoinOperator.input1.process(customer);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(2L);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Number of tuple emitted ", 2L, collectorTestSink.collectedTuples.size());
        Iterator it = collectorTestSink.collectedTuples.iterator();
        CustOrder custOrder = (CustOrder) ((List) it.next()).get(0);
        Assert.assertEquals("value of ID :", customer.ID, custOrder.ID);
        Assert.assertEquals("value of Name :", customer.Name, custOrder.Name);
        Assert.assertEquals("value of OID: ", r0.OID, custOrder.OID);
        Assert.assertEquals("value of Amount: ", r0.Amount, custOrder.Amount);
        List list = (List) it.next();
        Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", ((CustOrder) list.get(0)).toString());
        Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", ((CustOrder) list.get(1)).toString());
    }

    @Test
    public void testFullOuterJoinOperator() throws IOException, InterruptedException {
        Kryo kryo = new Kryo();
        POJOJoinOperator pOJOJoinOperator = new POJOJoinOperator();
        InMemoryStore inMemoryStore = new InMemoryStore(200L, 200);
        pOJOJoinOperator.setLeftStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setRightStore((JoinStore) kryo.copy(inMemoryStore));
        pOJOJoinOperator.setIncludeFields("ID,Name;OID,Amount");
        pOJOJoinOperator.setKeyFields("ID,CID");
        pOJOJoinOperator.outputClass = CustOrder.class;
        pOJOJoinOperator.setStrategy("outer_join");
        pOJOJoinOperator.setup(MapTimeBasedJoinOperator.context);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        pOJOJoinOperator.outputPort.setSink(collectorTestSink);
        pOJOJoinOperator.beginWindow(0L);
        pOJOJoinOperator.input1.process(new Customer(1, "Anil"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        pOJOJoinOperator.input2.process(new Order(102, 3, 300));
        pOJOJoinOperator.input2.process(new Order(103, 7, 300));
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(1L);
        pOJOJoinOperator.input2.process(new Order(104, 5, 300));
        Customer customer = new Customer(5, "DT");
        pOJOJoinOperator.input1.process(customer);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        pOJOJoinOperator.beginWindow(2L);
        pOJOJoinOperator.endWindow();
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Number of tuple emitted ", 3L, collectorTestSink.collectedTuples.size());
        Iterator it = collectorTestSink.collectedTuples.iterator();
        CustOrder custOrder = (CustOrder) ((List) it.next()).get(0);
        Assert.assertEquals("value of ID :", customer.ID, custOrder.ID);
        Assert.assertEquals("value of Name :", customer.Name, custOrder.Name);
        Assert.assertEquals("value of OID: ", r0.OID, custOrder.OID);
        Assert.assertEquals("value of Amount: ", r0.Amount, custOrder.Amount);
        Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", ((CustOrder) ((List) it.next()).get(0)).toString());
        List list = (List) it.next();
        Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", ((CustOrder) list.get(0)).toString());
        Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", ((CustOrder) list.get(1)).toString());
    }
}
