package com.datatorrent.contrib.kinesis;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/ShardManagerTest.class */
public class ShardManagerTest extends KinesisOperatorTestBase {
    static final Logger logger = LoggerFactory.getLogger(KinesisPartitionableInputOperatorTest.class);
    static List<String> collectedTuples = new LinkedList();
    static final int totalCount = 100;
    static CountDownLatch latch;
    static final String OFFSET_FILE = ".offset";

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/ShardManagerTest$CollectorInputPort.class */
    public static class CollectorInputPort extends DefaultInputPort<String> {
        public CollectorInputPort(Operator operator) {
        }

        public void process(String str) {
            if (!str.equals("END_TUPLE")) {
                ShardManagerTest.collectedTuples.add(str);
            } else if (ShardManagerTest.latch != null) {
                ShardManagerTest.latch.countDown();
            }
        }

        public void setConnected(boolean z) {
            if (z) {
                ShardManagerTest.collectedTuples.clear();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/ShardManagerTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
    }

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/ShardManagerTest$TestShardManager.class */
    public static class TestShardManager extends ShardManager {
        private String filename = null;
        private transient FileSystem fs = FileSystem.get(new Configuration());
        private transient FileContext fc = FileContext.getFileContext(this.fs.getUri());

        public void updatePositions(Map<String, String> map) {
            this.shardPos.putAll(map);
            try {
                Path path = new Path(this.filename + ".tmp");
                Path path2 = new Path(this.filename);
                FSDataOutputStream create = this.fs.create(path, true);
                for (Map.Entry entry : this.shardPos.entrySet()) {
                    create.writeBytes(((String) entry.getKey()) + ", " + ((String) entry.getValue()) + "\n");
                }
                create.close();
                this.fc.rename(path, path2, new Options.Rename[]{Options.Rename.OVERWRITE});
                countdownLatch();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private void countdownLatch() {
            if (ShardManagerTest.latch.getCount() == 1) {
                int i = 0;
                for (Map.Entry entry : this.shardPos.entrySet()) {
                    i++;
                }
                if (i == 102) {
                    ShardManagerTest.latch.countDown();
                }
            }
        }

        public void setFilename(String str) {
            this.filename = str;
        }

        public String getFilename() {
            return this.filename;
        }
    }

    public ShardManagerTest() {
        this.hasMultiPartition = true;
    }

    @Test
    public void testConsumerUpdateShardPos() throws Exception {
        try {
            testPartitionableInputOperator(new KinesisConsumer());
            cleanFile();
        } catch (Throwable th) {
            cleanFile();
            throw th;
        }
    }

    private void cleanFile() {
        try {
            FileSystem.get(new Configuration()).delete(new Path(this.streamName + OFFSET_FILE), true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void testPartitionableInputOperator(KinesisConsumer kinesisConsumer) throws Exception {
        latch = new CountDownLatch(3);
        KinesisTestProducer kinesisTestProducer = new KinesisTestProducer(this.streamName, true);
        kinesisTestProducer.setSendCount(100);
        kinesisTestProducer.run();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KinesisStringInputOperator addOperator = dag.addOperator("Kinesis consumer", KinesisStringInputOperator.class);
        addOperator.setAccessKey(this.credentials.getCredentials().getAWSSecretKey());
        addOperator.setSecretKey(this.credentials.getCredentials().getAWSAccessKeyId());
        addOperator.setStreamName(this.streamName);
        TestShardManager testShardManager = new TestShardManager();
        testShardManager.setFilename(this.streamName + OFFSET_FILE);
        addOperator.setShardManager(testShardManager);
        addOperator.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString());
        addOperator.setRepartitionInterval(-1L);
        kinesisConsumer.setStreamName(this.streamName);
        kinesisConsumer.setInitialOffset("earliest");
        addOperator.setConsumer(kinesisConsumer);
        dag.addStream("Kinesis Records", addOperator.outputPort, dag.addOperator("RecordCollector", new CollectorModule()).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        latch.await(15000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Tuple count", 100L, collectedTuples.size());
        logger.debug(String.format("Number of emitted tuples: %d -> %d", Integer.valueOf(collectedTuples.size()), 100));
        controller.shutdown();
    }

    @Test
    public void testShardManager() throws Exception {
        latch = new CountDownLatch(3);
        KinesisTestProducer kinesisTestProducer = new KinesisTestProducer(this.streamName, true);
        kinesisTestProducer.setSendCount(100);
        kinesisTestProducer.run();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KinesisUtil.getInstance().setClient(this.client);
        KinesisStringInputOperator addOperator = dag.addOperator("Kinesis consumer", KinesisStringInputOperator.class);
        addOperator.setAccessKey(this.credentials.getCredentials().getAWSSecretKey());
        addOperator.setSecretKey(this.credentials.getCredentials().getAWSAccessKeyId());
        addOperator.setStreamName(this.streamName);
        addOperator.setShardManager(new ShardManager());
        addOperator.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString());
        addOperator.setRepartitionInterval(-1L);
        KinesisConsumer kinesisConsumer = new KinesisConsumer();
        kinesisConsumer.setStreamName(this.streamName);
        kinesisConsumer.setInitialOffset("earliest");
        addOperator.setConsumer(kinesisConsumer);
        dag.addStream("Kinesis Records", addOperator.outputPort, dag.addOperator("RecordCollector", new CollectorModule()).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        latch.await(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("ShardPos Size", 2L, addOperator.getShardManager().loadInitialShardPositions().size());
        Iterator it = addOperator.getShardManager().loadInitialShardPositions().entrySet().iterator();
        Assert.assertNotEquals("Record Seq No in Shard Id 1", "", ((Map.Entry) it.next()).getValue());
        Assert.assertNotEquals("Record Seq No in Shard Id 2", "", ((Map.Entry) it.next()).getValue());
        Assert.assertEquals("Tuple count", 100L, collectedTuples.size());
        logger.debug(String.format("Number of emitted tuples: %d -> %d", Integer.valueOf(collectedTuples.size()), 100));
        controller.shutdown();
    }
}
