package com.datatorrent.lib.db;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.KeyValueStore;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;

/* loaded from: input_file:com/datatorrent/lib/db/KeyValueStoreOperatorTest.class */
public class KeyValueStoreOperatorTest<S extends KeyValueStore> {
    protected S operatorStore;
    protected S testStore;

    /* loaded from: input_file:com/datatorrent/lib/db/KeyValueStoreOperatorTest$CollectorModule.class */
    public static class CollectorModule<T> extends BaseOperator {
        static Map<String, String> resultMap = new HashMap();
        static long resultCount = 0;
        public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { // from class: com.datatorrent.lib.db.KeyValueStoreOperatorTest.CollectorModule.1
            public void process(T t) {
                CollectorModule.resultMap.putAll((Map) t);
                CollectorModule.resultCount++;
            }
        };
    }

    /* loaded from: input_file:com/datatorrent/lib/db/KeyValueStoreOperatorTest$InputOperator.class */
    protected static class InputOperator<S2 extends KeyValueStore> extends AbstractKeyValueStoreInputOperator<Map<String, String>, S2> {
        protected InputOperator() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Map<String, String> convertToTuple(Map<Object, Object> map) {
            return map;
        }

        /* renamed from: convertToTuple, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m23convertToTuple(Map map) {
            return convertToTuple((Map<Object, Object>) map);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/db/KeyValueStoreOperatorTest$OutputOperator.class */
    protected static class OutputOperator<S2 extends KeyValueStore> extends AbstractStoreOutputOperator<Map<String, String>, S2> {
        protected OutputOperator() {
        }

        public void processTuple(Map<String, String> map) {
            this.store.putAll(map);
        }
    }

    public KeyValueStoreOperatorTest(S s, S s2) {
        this.operatorStore = s;
        this.testStore = s2;
    }

    public void testInputOperator() throws Exception {
        this.testStore.connect();
        this.testStore.put("test_abc", "789");
        this.testStore.put("test_def", "456");
        this.testStore.put("test_ghi", "123");
        try {
            LocalMode newInstance = LocalMode.newInstance();
            DAG dag = newInstance.getDAG();
            InputOperator addOperator = dag.addOperator("input", new InputOperator());
            CollectorModule addOperator2 = dag.addOperator("collector", new CollectorModule());
            addOperator.addKey("test_abc");
            addOperator.addKey("test_def");
            addOperator.addKey("test_ghi");
            addOperator.setStore(this.operatorStore);
            dag.addStream("stream", addOperator.outputPort, addOperator2.inputPort);
            LocalMode.Controller controller = newInstance.getController();
            controller.run(3000L);
            controller.shutdown();
            Assert.assertEquals("789", CollectorModule.resultMap.get("test_abc"));
            Assert.assertEquals("456", CollectorModule.resultMap.get("test_def"));
            Assert.assertEquals("123", CollectorModule.resultMap.get("test_ghi"));
            this.testStore.remove("test_abc");
            this.testStore.remove("test_def");
            this.testStore.remove("test_ghi");
            this.testStore.disconnect();
        } catch (Throwable th) {
            this.testStore.remove("test_abc");
            this.testStore.remove("test_def");
            this.testStore.remove("test_ghi");
            this.testStore.disconnect();
            throw th;
        }
    }

    public void testOutputOperator() throws IOException {
        OutputOperator outputOperator = new OutputOperator();
        try {
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(DAG.APPLICATION_ID, "test_appid");
            outputOperator.setStore(this.operatorStore);
            outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap));
            outputOperator.beginWindow(100L);
            HashMap hashMap = new HashMap();
            hashMap.put("test_abc", "123");
            hashMap.put("test_def", "456");
            outputOperator.input.process(hashMap);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("test_ghi", "789");
            outputOperator.input.process(hashMap2);
            outputOperator.endWindow();
            outputOperator.teardown();
            this.testStore.connect();
            Assert.assertEquals("123", this.testStore.get("test_abc"));
            Assert.assertEquals("456", this.testStore.get("test_def"));
            Assert.assertEquals("789", this.testStore.get("test_ghi"));
            this.testStore.remove("test_abc");
            this.testStore.remove("test_def");
            this.testStore.remove("test_ghi");
            this.testStore.disconnect();
        } catch (Throwable th) {
            this.testStore.remove("test_abc");
            this.testStore.remove("test_def");
            this.testStore.remove("test_ghi");
            this.testStore.disconnect();
            throw th;
        }
    }
}
