package com.datatorrent.contrib.redis;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.junit.Assert;
import org.junit.Test;
import redis.clients.jedis.ScanParams;

/* loaded from: input_file:com/datatorrent/contrib/redis/RedisInputOperatorTest.class */
public class RedisInputOperatorTest {
    private RedisStore operatorStore;
    private RedisStore testStore;

    /* loaded from: input_file:com/datatorrent/contrib/redis/RedisInputOperatorTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        static volatile List<KeyValPair<String, String>> resultMap = new ArrayList();
        static long resultCount = 0;
        public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>() { // from class: com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule.1
            public void process(KeyValPair<String, String> keyValPair) {
                CollectorModule.resultMap.add(keyValPair);
                CollectorModule.resultCount++;
            }
        };
    }

    /* JADX WARN: Type inference failed for: r0v39, types: [com.datatorrent.contrib.redis.RedisInputOperatorTest$1] */
    @Test
    public void testIntputOperator() throws IOException {
        this.operatorStore = new RedisStore();
        this.testStore = new RedisStore();
        this.testStore.connect();
        new ScanParams().count(1);
        this.testStore.put("test_abc", "789");
        this.testStore.put("test_def", "456");
        this.testStore.put("test_ghi", "123");
        try {
            LocalMode newInstance = LocalMode.newInstance();
            DAG dag = newInstance.getDAG();
            RedisKeyValueInputOperator addOperator = dag.addOperator("input", new RedisKeyValueInputOperator());
            CollectorModule addOperator2 = dag.addOperator("collector", new CollectorModule());
            addOperator.setStore(this.operatorStore);
            dag.addStream("stream", addOperator.outputPort, addOperator2.inputPort);
            final LocalMode.Controller controller = newInstance.getController();
            new Thread("LocalClusterController") { // from class: com.datatorrent.contrib.redis.RedisInputOperatorTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        Thread.sleep(1000L);
                        while (System.currentTimeMillis() - currentTimeMillis < 50000 && CollectorModule.resultMap.size() < 3) {
                            Thread.sleep(10L);
                        }
                    } catch (InterruptedException e) {
                    }
                    controller.shutdown();
                }
            }.start();
            controller.run();
            Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair("test_abc", "789")));
            Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair("test_def", "456")));
            Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair("test_ghi", "123")));
            Iterator<KeyValPair<String, String>> it = CollectorModule.resultMap.iterator();
            while (it.hasNext()) {
                this.testStore.remove(it.next().getKey());
            }
            this.testStore.disconnect();
        } catch (Throwable th) {
            Iterator<KeyValPair<String, String>> it2 = CollectorModule.resultMap.iterator();
            while (it2.hasNext()) {
                this.testStore.remove(it2.next().getKey());
            }
            this.testStore.disconnect();
            throw th;
        }
    }

    @Test
    public void testRecoveryAndIdempotency() throws Exception {
        this.operatorStore = new RedisStore();
        this.testStore = new RedisStore();
        this.testStore.connect();
        new ScanParams().count(1);
        this.testStore.put("test_abc", "789");
        this.testStore.put("test_def", "456");
        this.testStore.put("test_ghi", "123");
        RedisKeyValueInputOperator redisKeyValueInputOperator = new RedisKeyValueInputOperator();
        redisKeyValueInputOperator.setWindowDataManager(new FSWindowDataManager());
        redisKeyValueInputOperator.setStore(this.operatorStore);
        redisKeyValueInputOperator.setScanCount(1);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        redisKeyValueInputOperator.outputPort.setSink(collectorTestSink);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
        try {
            redisKeyValueInputOperator.setup(testIdOperatorContext);
            redisKeyValueInputOperator.beginWindow(1L);
            redisKeyValueInputOperator.emitTuples();
            redisKeyValueInputOperator.endWindow();
            int size = collectorTestSink.collectedTuples.size();
            collectorTestSink.collectedTuples.clear();
            redisKeyValueInputOperator.beginWindow(2L);
            redisKeyValueInputOperator.emitTuples();
            redisKeyValueInputOperator.endWindow();
            int size2 = collectorTestSink.collectedTuples.size();
            collectorTestSink.collectedTuples.clear();
            redisKeyValueInputOperator = new RedisKeyValueInputOperator();
            redisKeyValueInputOperator.setWindowDataManager(new FSWindowDataManager());
            redisKeyValueInputOperator.setStore(this.operatorStore);
            redisKeyValueInputOperator.setScanCount(1);
            redisKeyValueInputOperator.outputPort.setSink(collectorTestSink);
            redisKeyValueInputOperator.setup(testIdOperatorContext);
            Assert.assertEquals("largest recovery window", 2L, redisKeyValueInputOperator.getWindowDataManager().getLargestCompletedWindow());
            redisKeyValueInputOperator.beginWindow(1L);
            redisKeyValueInputOperator.emitTuples();
            redisKeyValueInputOperator.emitTuples();
            redisKeyValueInputOperator.endWindow();
            Assert.assertEquals("num of messages in window 1", size, collectorTestSink.collectedTuples.size());
            collectorTestSink.collectedTuples.clear();
            redisKeyValueInputOperator.beginWindow(2L);
            redisKeyValueInputOperator.emitTuples();
            redisKeyValueInputOperator.endWindow();
            Assert.assertEquals("num of messages in window 2", size2, collectorTestSink.collectedTuples.size());
            Iterator it = collectorTestSink.collectedTuples.iterator();
            while (it.hasNext()) {
                this.testStore.remove(((KeyValPair) it.next()).getKey());
            }
            collectorTestSink.collectedTuples.clear();
            redisKeyValueInputOperator.getWindowDataManager().committed(5L);
            redisKeyValueInputOperator.teardown();
        } catch (Throwable th) {
            Iterator it2 = collectorTestSink.collectedTuples.iterator();
            while (it2.hasNext()) {
                this.testStore.remove(((KeyValPair) it2.next()).getKey());
            }
            collectorTestSink.collectedTuples.clear();
            redisKeyValueInputOperator.getWindowDataManager().committed(5L);
            redisKeyValueInputOperator.teardown();
            throw th;
        }
    }
}
